[ISSUE #301] Adding GET_CONSUMER_RUNNING_INFO (#311)
* Adding GET_CONSUMER_RUNNING_INFO impl
* add Marshal ConsumerRunningInfo Test Case
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 13cef4e..070cd19 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -236,12 +236,13 @@
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
- consumerGroup string
- model MessageModel
- allocate func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
- unitMode bool
- consumeOrderly bool
- fromWhere ConsumeFromWhere
+ consumerGroup string
+ model MessageModel
+ allocate func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
+ unitMode bool
+ consumeOrderly bool
+ fromWhere ConsumeFromWhere
+ consumerStartTimestamp int64
cType ConsumeType
client internal.RMQClient
@@ -250,7 +251,7 @@
pause bool
once sync.Once
option consumerOptions
- // key: int, hash(*primitive.MessageQueue)
+ // key: primitive.MessageQueue
// value: *processQueue
processQueueTable sync.Map
@@ -287,7 +288,7 @@
dc.client.UpdateTopicRouteInfo()
dc.client.Start()
dc.state = internal.StateRunning
-
+ dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
return nil
}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 13b2d25..4f4dc0c 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -28,6 +28,7 @@
gods_util "github.com/emirpasic/gods/utils"
uatomic "go.uber.org/atomic"
+ "github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
)
@@ -277,6 +278,28 @@
return -1
}
+func (pq *processQueue) MinOrderlyCache() int64 {
+ if pq.consumingMsgOrderlyTreeMap.Empty() {
+ return -1
+ }
+ k, _ := pq.consumingMsgOrderlyTreeMap.Min()
+ if k != nil {
+ return k.(int64)
+ }
+ return -1
+}
+
+func (pq *processQueue) MaxOrderlyCache() int64 {
+ if pq.consumingMsgOrderlyTreeMap.Empty() {
+ return -1
+ }
+ k, _ := pq.consumingMsgOrderlyTreeMap.Max()
+ if k != nil {
+ return k.(int64)
+ }
+ return -1
+}
+
func (pq *processQueue) clear() {
pq.mutex.Lock()
pq.msgCache.Clear()
@@ -302,3 +325,31 @@
pq.consumingMsgOrderlyTreeMap.Clear()
return offset + 1
}
+
+func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
+ pq.mutex.RLock()
+ defer pq.mutex.RUnlock()
+ info := internal.ProcessQueueInfo{
+ Locked: pq.locked.Load(),
+ TryUnlockTimes: pq.tryUnlockTimes,
+ LastLockTimestamp: pq.lastLockTime.UnixNano() / 10e6,
+ Dropped: pq.dropped.Load(),
+ LastPullTimestamp: pq.lastPullTime.UnixNano() / 10e6,
+ LastConsumeTimestamp: pq.lastConsumeTime.UnixNano() / 10e6,
+ }
+
+ if !pq.msgCache.Empty() {
+ info.CachedMsgMinOffset = pq.Min()
+ info.CachedMsgMaxOffset = pq.Max()
+ info.CachedMsgCount = pq.msgCache.Size()
+ info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
+ }
+
+ if !pq.consumingMsgOrderlyTreeMap.Empty() {
+ info.TransactionMsgMinOffset = pq.MinOrderlyCache()
+ info.TransactionMsgMaxOffset = pq.MaxOrderlyCache()
+ info.TransactionMsgCount = pq.consumingMsgOrderlyTreeMap.Size()
+ }
+
+ return info
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3c5b79a..98b0033 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -273,6 +273,45 @@
return pc.unitMode
}
+func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
+ info := internal.NewConsumerRunningInfo()
+
+ pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+ topic := key.(string)
+ info.SubscriptionData[value.(*internal.SubscriptionData)] = true
+ status := internal.ConsumeStatus{
+ PullRT: getPullRT(topic, pc.consumerGroup).avgpt,
+ PullTPS: getPullTPS(topic, pc.consumerGroup).tps,
+ ConsumeRT: getConsumeRT(topic, pc.consumerGroup).avgpt,
+ ConsumeOKTPS: getConsumeOKTPS(topic, pc.consumerGroup).tps,
+ ConsumeFailedTPS: getConsumeFailedTPS(topic, pc.consumerGroup).tps,
+ ConsumeFailedMsgs: topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
+ }
+ info.StatusTable[topic] = status
+ return true
+ })
+
+ pc.processQueueTable.Range(func(key, value interface{}) bool {
+ mq := key.(primitive.MessageQueue)
+ pq := value.(*processQueue)
+ pInfo := pq.currentInfo()
+ pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore)
+ info.MQTable[mq] = pInfo
+ return true
+ })
+
+ nsAddr := ""
+ for _, value := range pc.namesrv.AddrList() {
+ nsAddr += fmt.Sprintf("%s;", value)
+ }
+ info.Properties[internal.PropNameServerAddr] = nsAddr
+ info.Properties[internal.PropConsumeType] = string(pc.cType)
+ info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly)
+ info.Properties[internal.PropThreadPoolCoreSize] = "-1"
+ info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10)
+ return info
+}
+
func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
v, exit := pc.subscriptionDataTable.Load(topic)
if !exit {
diff --git a/internal/client.go b/internal/client.go
index 3336944..d9c5f6a 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -18,7 +18,6 @@
package internal
import (
- "bytes"
"context"
"errors"
"fmt"
@@ -36,6 +35,7 @@
)
const (
+ clientVersion = "v2.0.0-alpha3"
defaultTraceRegionID = "DefaultRegion"
// tracing message switch
@@ -83,6 +83,7 @@
SubscriptionDataList() []*SubscriptionData
Rebalance()
IsUnitMode() bool
+ GetConsumerRunningInfo() *ConsumerRunningInfo
}
func DefaultClientOptions() ClientOptions {
@@ -220,8 +221,30 @@
client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive get consumer running info request...", nil)
+ header := new(GetConsumerRunningInfoHeader)
+ header.Decode(req.ExtFields)
+ val, exist := clientMap.Load(header.clientID)
res := remote.NewRemotingCommand(ResError, nil, nil)
- res.Remark = "the go client has not supported consumer running info"
+ if !exist {
+ res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
+ } else {
+ cli, ok := val.(*rmqClient)
+ var runningInfo *ConsumerRunningInfo
+ if ok {
+ runningInfo = cli.getConsumerRunningInfo(header.consumerGroup)
+ }
+ if runningInfo != nil {
+ res.Code = ResSuccess
+ data, err := runningInfo.Encode()
+ if err != nil {
+ res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
+ } else {
+ res.Body = data
+ }
+ } else {
+ res.Remark = "there is unexpected error when get running info, please check log"
+ }
+ }
return res
})
}
@@ -659,6 +682,18 @@
return result
}
+func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
+ consumer, exist := c.consumerMap.Load(group)
+ if !exist {
+ return nil
+ }
+ info := consumer.(InnerConsumer).GetConsumerRunningInfo()
+ if info != nil {
+ info.Properties[PropClientVersion] = clientVersion
+ }
+ return info
+}
+
func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
list := make([]*primitive.MessageQueue, 0)
for idx := range data.QueueDataList {
@@ -676,16 +711,6 @@
return list
}
-func encodeMessages(message []*primitive.Message) []byte {
- var buffer bytes.Buffer
- index := 0
- for index < len(message) {
- buffer.Write(message[index].Body)
- index++
- }
- return buffer.Bytes()
-}
-
func brokerVIPChannel(brokerAddr string) string {
if !_VIPChannelEnable {
return brokerAddr
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 630d4cc..19a2181 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -52,26 +52,31 @@
// AddBroker mocks base method
func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) {
+ m.ctrl.T.Helper()
m.ctrl.Call(m, "AddBroker", routeData)
}
// AddBroker indicates an expected call of AddBroker
func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBroker", reflect.TypeOf((*MockNamesrvs)(nil).AddBroker), routeData)
}
// cleanOfflineBroker mocks base method
func (m *MockNamesrvs) cleanOfflineBroker() {
+ m.ctrl.T.Helper()
m.ctrl.Call(m, "cleanOfflineBroker")
}
// cleanOfflineBroker indicates an expected call of cleanOfflineBroker
func (mr *MockNamesrvsMockRecorder) cleanOfflineBroker() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "cleanOfflineBroker", reflect.TypeOf((*MockNamesrvs)(nil).cleanOfflineBroker))
}
// UpdateTopicRouteInfo mocks base method
func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
ret0, _ := ret[0].(*TopicRouteData)
return ret0
@@ -79,11 +84,13 @@
// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockNamesrvs)(nil).UpdateTopicRouteInfo), topic)
}
// FetchPublishMessageQueues mocks base method
func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchPublishMessageQueues", topic)
ret0, _ := ret[0].([]*primitive.MessageQueue)
ret1, _ := ret[1].(error)
@@ -92,11 +99,13 @@
// FetchPublishMessageQueues indicates an expected call of FetchPublishMessageQueues
func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchPublishMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchPublishMessageQueues), topic)
}
// FindBrokerAddrByTopic mocks base method
func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddrByTopic", topic)
ret0, _ := ret[0].(string)
return ret0
@@ -104,11 +113,13 @@
// FindBrokerAddrByTopic indicates an expected call of FindBrokerAddrByTopic
func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByTopic", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByTopic), topic)
}
// FindBrokerAddrByName mocks base method
func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddrByName", brokerName)
ret0, _ := ret[0].(string)
return ret0
@@ -116,11 +127,13 @@
// FindBrokerAddrByName indicates an expected call of FindBrokerAddrByName
func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByName", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByName), brokerName)
}
// FindBrokerAddressInSubscribe mocks base method
func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddressInSubscribe", brokerName, brokerId, onlyThisBroker)
ret0, _ := ret[0].(*FindBrokerResult)
return ret0
@@ -128,11 +141,13 @@
// FindBrokerAddressInSubscribe indicates an expected call of FindBrokerAddressInSubscribe
func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddressInSubscribe", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddressInSubscribe), brokerName, brokerId, onlyThisBroker)
}
// FetchSubscribeMessageQueues mocks base method
func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchSubscribeMessageQueues", topic)
ret0, _ := ret[0].([]*primitive.MessageQueue)
ret1, _ := ret[1].(error)
@@ -141,5 +156,20 @@
// FetchSubscribeMessageQueues indicates an expected call of FetchSubscribeMessageQueues
func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSubscribeMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic)
}
+
+// AddrList mocks base method
+func (m *MockNamesrvs) AddrList() []string {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "AddrList")
+ ret0, _ := ret[0].([]string)
+ return ret0
+}
+
+// AddrList indicates an expected call of AddrList
+func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
+}
diff --git a/internal/model.go b/internal/model.go
index 8c95458..ea1fdac 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -18,9 +18,14 @@
package internal
import (
+ "bytes"
"encoding/json"
+ "fmt"
+ "sort"
+ "strings"
"github.com/apache/rocketmq-client-go/internal/utils"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
)
@@ -100,3 +105,158 @@
rlog.Debug("heartbeat: "+string(d), nil)
return d
}
+
+const (
+ PropNameServerAddr = "PROP_NAMESERVER_ADDR"
+ PropThreadPoolCoreSize = "PROP_THREADPOOL_CORE_SIZE"
+ PropConsumeOrderly = "PROP_CONSUMEORDERLY"
+ PropConsumeType = "PROP_CONSUME_TYPE"
+ PropClientVersion = "PROP_CLIENT_VERSION"
+ PropConsumerStartTimestamp = "PROP_CONSUMER_START_TIMESTAMP"
+)
+
+type ProcessQueueInfo struct {
+ CommitOffset int64 `json:"commitOffset"`
+ CachedMsgMinOffset int64 `json:"cachedMsgMinOffset"`
+ CachedMsgMaxOffset int64 `json:"cachedMsgMaxOffset"`
+ CachedMsgCount int `json:"cachedMsgCount"`
+ CachedMsgSizeInMiB int64 `json:"cachedMsgSizeInMiB"`
+ TransactionMsgMinOffset int64 `json:"transactionMsgMinOffset"`
+ TransactionMsgMaxOffset int64 `json:"transactionMsgMaxOffset"`
+ TransactionMsgCount int `json:"transactionMsgCount"`
+ Locked bool `json:"locked"`
+ TryUnlockTimes int64 `json:"tryUnlockTimes"`
+ LastLockTimestamp int64 `json:"lastLockTimestamp"`
+ Dropped bool `json:"dropped"`
+ LastPullTimestamp int64 `json:"lastPullTimestamp"`
+ LastConsumeTimestamp int64 `json:"lastConsumeTimestamp"`
+}
+
+type ConsumeStatus struct {
+ PullRT float64 `json:"pullRT"`
+ PullTPS float64 `json:"pullTPS"`
+ ConsumeRT float64 `json:"consumeRT"`
+ ConsumeOKTPS float64 `json:"consumeOKTPS"`
+ ConsumeFailedTPS float64 `json:"consumeFailedTPS"`
+ ConsumeFailedMsgs int64 `json:"consumeFailedMsgs"`
+}
+
+type ConsumerRunningInfo struct {
+ Properties map[string]string
+ SubscriptionData map[*SubscriptionData]bool
+ MQTable map[primitive.MessageQueue]ProcessQueueInfo
+ StatusTable map[string]ConsumeStatus
+}
+
+func (info ConsumerRunningInfo) Encode() ([]byte, error) {
+ data, err := json.Marshal(info.Properties)
+ if err != nil {
+ return nil, err
+ }
+ jsonData := fmt.Sprintf("{\"%s\":%s", "properties", string(data))
+
+ data, err = json.Marshal(info.StatusTable)
+ if err != nil {
+ return nil, err
+ }
+ jsonData = fmt.Sprintf("%s,\"%s\":%s", jsonData, "statusTable", string(data))
+
+ subs := make([]*SubscriptionData, len(info.SubscriptionData))
+ idx := 0
+ for k := range info.SubscriptionData {
+ subs[idx] = k
+ idx++
+ }
+
+ // make sure test case table
+ sort.Slice(subs, func(i, j int) bool {
+ sub1 := subs[i]
+ sub2 := subs[j]
+ if sub1.ClassFilterMode != sub2.ClassFilterMode {
+ return sub1.ClassFilterMode == false
+ }
+ com := strings.Compare(sub1.Topic, sub1.Topic)
+ if com != 0 {
+ return com > 0
+ }
+
+ com = strings.Compare(sub1.SubString, sub1.SubString)
+ if com != 0 {
+ return com > 0
+ }
+
+ if sub1.SubVersion != sub2.SubVersion {
+ return sub1.SubVersion > sub2.SubVersion
+ }
+
+ com = strings.Compare(sub1.ExpType, sub1.ExpType)
+ if com != 0 {
+ return com > 0
+ }
+
+ v1, _ := sub1.Tags.MarshalJSON()
+ v2, _ := sub2.Tags.MarshalJSON()
+ com = bytes.Compare(v1, v2)
+ if com != 0 {
+ return com > 0
+ }
+
+ v1, _ = sub1.Codes.MarshalJSON()
+ v2, _ = sub2.Codes.MarshalJSON()
+ com = bytes.Compare(v1, v2)
+ if com != 0 {
+ return com > 0
+ }
+ return true
+ })
+
+ data, err = json.Marshal(subs)
+ if err != nil {
+ return nil, err
+ }
+ jsonData = fmt.Sprintf("%s,\"%s\":%s", jsonData, "subscriptionSet", string(data))
+
+ tableJson := ""
+ keys := make([]primitive.MessageQueue, 0)
+
+ for k := range info.MQTable {
+ keys = append(keys, k)
+ }
+
+ sort.Slice(keys, func(i, j int) bool {
+ q1 := keys[i]
+ q2 := keys[j]
+ com := strings.Compare(q1.Topic, q2.Topic)
+ if com != 0 {
+ return com < 0
+ }
+
+ com = strings.Compare(q1.BrokerName, q2.BrokerName)
+ if com != 0 {
+ return com < 0
+ }
+
+ return q1.QueueId < q2.QueueId
+ })
+
+ for idx := range keys {
+ dataK, err := json.Marshal(keys[idx])
+ if err != nil {
+ return nil, err
+ }
+ dataV, err := json.Marshal(info.MQTable[keys[idx]])
+ tableJson = fmt.Sprintf("%s,%s:%s", tableJson, string(dataK), string(dataV))
+ }
+ tableJson = strings.TrimLeft(tableJson, ",")
+ jsonData = fmt.Sprintf("%s,\"%s\":%s}", jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson))
+ return []byte(jsonData), nil
+}
+
+func NewConsumerRunningInfo() *ConsumerRunningInfo {
+ return &ConsumerRunningInfo{
+ Properties: make(map[string]string),
+ SubscriptionData: make(map[*SubscriptionData]bool),
+ MQTable: make(map[primitive.MessageQueue]ProcessQueueInfo),
+ StatusTable: make(map[string]ConsumeStatus),
+ }
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 1dac4ec..56703e0 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -20,12 +20,14 @@
import (
"encoding/json"
"fmt"
+ "strings"
"testing"
. "github.com/smartystreets/goconvey/convey"
- "github.com/stretchr/testify/assert"
+ "github.com/tidwall/gjson"
"github.com/apache/rocketmq-client-go/internal/utils"
+ "github.com/apache/rocketmq-client-go/primitive"
)
func TestHeartbeatData(t *testing.T) {
@@ -43,7 +45,7 @@
set.Add(pData2)
v, err := json.Marshal(set)
- assert.Nil(t, err)
+ So(err, ShouldBeNil)
fmt.Printf("json producer set: %s", string(v))
})
@@ -61,8 +63,7 @@
hbt.ProducerDatas.Add(p2)
v, err := json.Marshal(hbt)
- //ShouldBeNil(t, err)
- assert.Nil(t, err)
+ So(err, ShouldBeNil)
fmt.Printf("json producer: %s\n", string(v))
})
@@ -79,8 +80,7 @@
hbt.ConsumerDatas.Add(c2)
v, err := json.Marshal(hbt)
- //ShouldBeNil(t, err)
- assert.Nil(t, err)
+ So(err, ShouldBeNil)
fmt.Printf("json consumer: %s\n", string(v))
})
@@ -108,10 +108,257 @@
hbt.ConsumerDatas.Add(c2)
v, err := json.Marshal(hbt)
- //ShouldBeNil(t, err)
- assert.Nil(t, err)
+ So(err, ShouldBeNil)
fmt.Printf("json producer & consumer: %s\n", string(v))
})
})
}
+
+func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
+ Convey("test ConsumerRunningInfo MarshalJson", t, func() {
+ props := map[string]string{
+ "maxReconsumeTimes": "-1",
+ "unitMode": "false",
+ "adjustThreadPoolNumsThreshold": "100000",
+ "consumerGroup": "mq-client-go-test%GID_GO_TEST",
+ "messageModel": "CLUSTERING",
+ "suspendCurrentQueueTimeMillis": "1000",
+ "pullThresholdSizeForTopic": "-1",
+ "pullThresholdSizeForQueue": "100",
+ "PROP_CLIENT_VERSION": "V4_5_1",
+ "consumeConcurrentlyMaxSpan": "2000",
+ "postSubscriptionWhenPull": "false",
+ "consumeTimestamp": "20191127013617",
+ "PROP_CONSUME_TYPE": "CONSUME_PASSIVELY",
+ "consumeTimeout": "15",
+ "consumeMessageBatchMaxSize": "1",
+ "PROP_THREADPOOL_CORE_SIZE": "20",
+ "pullInterval": "0",
+ "pullThresholdForQueue": "1000",
+ "pullThresholdForTopic": "-1",
+ "consumeFromWhere": "CONSUME_FROM_FIRST_OFFSET",
+ "PROP_NAMESERVER_ADDR": "mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;",
+ "pullBatchSize": "32",
+ "consumeThreadMin": "20",
+ "PROP_CONSUMER_START_TIMESTAMP": "1574791577504",
+ "consumeThreadMax": "20",
+ "subscription": "{}",
+ "PROP_CONSUMEORDERLY": "false",
+ }
+ subData := map[*SubscriptionData]bool{
+ &SubscriptionData{
+ ClassFilterMode: false,
+ Codes: utils.NewSet(),
+ ExpType: "TAG",
+ SubString: "*",
+ SubVersion: 1574791579242,
+ Tags: utils.NewSet(),
+ Topic: "%RETRY%mq-client-go-test%GID_GO_TEST",
+ }: true,
+ &SubscriptionData{
+ ClassFilterMode: true,
+ Codes: utils.NewSet(),
+ ExpType: "TAG",
+ SubString: "*",
+ SubVersion: 1574791577523,
+ Tags: utils.NewSet(),
+ Topic: "mq-client-go-test%go-test",
+ }: true,
+ }
+ statusTable := map[string]ConsumeStatus{
+ "%RETRY%mq-client-go-test%GID_GO_TEST": {
+ PullRT: 11.11,
+ PullTPS: 22.22,
+ ConsumeRT: 33.33,
+ ConsumeOKTPS: 44.44,
+ ConsumeFailedTPS: 55.55,
+ ConsumeFailedMsgs: 666,
+ },
+ "mq-client-go-test%go-test": {
+ PullRT: 123,
+ PullTPS: 123,
+ ConsumeRT: 123,
+ ConsumeOKTPS: 123,
+ ConsumeFailedTPS: 123,
+ ConsumeFailedMsgs: 1234,
+ },
+ }
+ mqTable := map[primitive.MessageQueue]ProcessQueueInfo{
+ {
+ Topic: "%RETRY%mq-client-go-test%GID_GO_TEST",
+ BrokerName: "qd7internet-01",
+ QueueId: 0,
+ }: {
+ CommitOffset: 0,
+ CachedMsgMinOffset: 0,
+ CachedMsgMaxOffset: 0,
+ CachedMsgCount: 0,
+ CachedMsgSizeInMiB: 0,
+ TransactionMsgMinOffset: 0,
+ TransactionMsgMaxOffset: 0,
+ TransactionMsgCount: 0,
+ Locked: false,
+ TryUnlockTimes: 0,
+ LastLockTimestamp: 1574791579221,
+ Dropped: false,
+ LastPullTimestamp: 1574791579242,
+ LastConsumeTimestamp: 1574791579221,
+ },
+ {
+ Topic: "%RETRY%mq-client-go-test%GID_GO_TEST",
+ BrokerName: "qd7internet-01",
+ QueueId: 1,
+ }: {
+ CommitOffset: 1,
+ CachedMsgMinOffset: 2,
+ CachedMsgMaxOffset: 3,
+ CachedMsgCount: 4,
+ CachedMsgSizeInMiB: 5,
+ TransactionMsgMinOffset: 6,
+ TransactionMsgMaxOffset: 7,
+ TransactionMsgCount: 8,
+ Locked: true,
+ TryUnlockTimes: 9,
+ LastLockTimestamp: 1574791579221,
+ Dropped: false,
+ LastPullTimestamp: 1574791579242,
+ LastConsumeTimestamp: 1574791579221,
+ },
+ }
+ info := ConsumerRunningInfo{
+ Properties: props,
+ SubscriptionData: subData,
+ StatusTable: statusTable,
+ MQTable: mqTable,
+ }
+ data, err := info.Encode()
+ So(err, ShouldBeNil)
+ result := gjson.ParseBytes(data)
+ Convey("test Properties fields", func() {
+ r1 := result.Get("properties")
+ So(r1.Exists(), ShouldBeTrue)
+ m := r1.Map()
+ So(len(m), ShouldEqual, 27)
+
+ So(m["PROP_CLIENT_VERSION"], ShouldNotBeEmpty)
+ So(m["PROP_CLIENT_VERSION"].String(), ShouldEqual, "V4_5_1")
+
+ So(m["PROP_CONSUME_TYPE"], ShouldNotBeNil)
+ So(m["PROP_CONSUME_TYPE"].String(), ShouldEqual, "CONSUME_PASSIVELY")
+
+ So(m["PROP_THREADPOOL_CORE_SIZE"], ShouldNotBeNil)
+ So(m["PROP_THREADPOOL_CORE_SIZE"].String(), ShouldEqual, "20")
+
+ So(m["PROP_NAMESERVER_ADDR"], ShouldNotBeNil)
+ So(m["PROP_NAMESERVER_ADDR"].String(), ShouldEqual, "mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;")
+
+ So(m["PROP_CONSUMER_START_TIMESTAMP"], ShouldNotBeNil)
+ So(m["PROP_CONSUMER_START_TIMESTAMP"].String(), ShouldEqual, "1574791577504")
+
+ So(m["PROP_CONSUMEORDERLY"], ShouldNotBeNil)
+ So(m["PROP_CONSUMEORDERLY"].String(), ShouldEqual, "false")
+ })
+ Convey("test SubscriptionData fields", func() {
+ r2 := result.Get("subscriptionSet")
+ So(r2.Exists(), ShouldBeTrue)
+ arr := r2.Array()
+ So(len(arr), ShouldEqual, 2)
+
+ m1 := arr[0].Map()
+ So(len(m1), ShouldEqual, 7)
+ So(m1["classFilterMode"].Bool(), ShouldEqual, false)
+ So(len(m1["codes"].Array()), ShouldEqual, 0)
+ So(m1["expressionType"].String(), ShouldEqual, "TAG")
+ So(m1["subString"].String(), ShouldEqual, "*")
+ So(m1["subVersion"].Int(), ShouldEqual, 1574791579242)
+ So(len(m1["tags"].Array()), ShouldEqual, 0)
+ So(m1["topic"].String(), ShouldEqual, "%RETRY%mq-client-go-test%GID_GO_TEST")
+
+ m2 := arr[1].Map()
+ So(len(m2), ShouldEqual, 7)
+ So(m2["classFilterMode"].Bool(), ShouldEqual, true)
+ So(len(m2["codes"].Array()), ShouldEqual, 0)
+ So(m2["expressionType"].String(), ShouldEqual, "TAG")
+ So(m2["subString"].String(), ShouldEqual, "*")
+ So(m2["subVersion"].Int(), ShouldEqual, 1574791577523)
+ So(len(m2["tags"].Array()), ShouldEqual, 0)
+ So(m2["topic"].String(), ShouldEqual, "mq-client-go-test%go-test")
+ })
+ Convey("test StatusTable fields", func() {
+ r3 := result.Get("statusTable")
+ So(r3.Exists(), ShouldBeTrue)
+ m := r3.Map()
+ So(len(m), ShouldEqual, 2)
+
+ status1 := m["mq-client-go-test%go-test"].Map()
+ So(len(status1), ShouldEqual, 6)
+ So(status1["pullRT"].Float(), ShouldEqual, 123)
+ So(status1["pullTPS"].Float(), ShouldEqual, 123)
+ So(status1["consumeRT"].Float(), ShouldEqual, 123)
+ So(status1["consumeOKTPS"].Float(), ShouldEqual, 123)
+ So(status1["consumeFailedTPS"].Float(), ShouldEqual, 123)
+ So(status1["consumeFailedMsgs"].Int(), ShouldEqual, 1234)
+
+ status2 := m["%RETRY%mq-client-go-test%GID_GO_TEST"].Map()
+ So(len(status2), ShouldEqual, 6)
+ So(status2["pullRT"].Float(), ShouldEqual, 11.11)
+ So(status2["pullTPS"].Float(), ShouldEqual, 22.22)
+ So(status2["consumeRT"].Float(), ShouldEqual, 33.33)
+ So(status2["consumeOKTPS"].Float(), ShouldEqual, 44.44)
+ So(status2["consumeFailedTPS"].Float(), ShouldEqual, 55.55)
+ So(status2["consumeFailedMsgs"].Int(), ShouldEqual, 666)
+ })
+ Convey("test MQTable fields", func() {
+ r4 := result.Get("mqTable")
+ So(r4.Exists(), ShouldBeTrue)
+ objNumbers := strings.Split(r4.String(), "},{")
+ So(len(objNumbers), ShouldEqual, 2)
+
+ obj1Str := objNumbers[0][1:len(objNumbers[0])] + "}"
+ obj1KV := strings.Split(obj1Str, "}:{")
+ So(len(obj1KV), ShouldEqual, 2)
+
+ obj1 := gjson.Parse("{" + obj1KV[1][0:len(obj1KV[1])])
+ So(obj1.Exists(), ShouldBeTrue)
+ obj1M := obj1.Map()
+ So(len(obj1M), ShouldEqual, 14)
+ So(obj1M["commitOffset"].Int(), ShouldEqual, 0)
+ So(obj1M["cachedMsgMinOffset"].Int(), ShouldEqual, 0)
+ So(obj1M["cachedMsgMaxOffset"].Int(), ShouldEqual, 0)
+ So(obj1M["cachedMsgCount"].Int(), ShouldEqual, 0)
+ So(obj1M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 0)
+ So(obj1M["transactionMsgMinOffset"].Int(), ShouldEqual, 0)
+ So(obj1M["transactionMsgMaxOffset"].Int(), ShouldEqual, 0)
+ So(obj1M["transactionMsgCount"].Int(), ShouldEqual, 0)
+ So(obj1M["locked"].Bool(), ShouldEqual, false)
+ So(obj1M["tryUnlockTimes"].Int(), ShouldEqual, 0)
+ So(obj1M["lastLockTimestamp"].Int(), ShouldEqual, 1574791579221)
+ So(obj1M["dropped"].Bool(), ShouldEqual, false)
+ So(obj1M["lastPullTimestamp"].Int(), ShouldEqual, 1574791579242)
+ So(obj1M["lastConsumeTimestamp"].Int(), ShouldEqual, 1574791579221)
+
+ obj2Str := "{" + objNumbers[1][0:len(objNumbers[1])-1]
+ obj2KV := strings.Split(obj2Str, "}:{")
+ So(len(obj2KV), ShouldEqual, 2)
+ obj2 := gjson.Parse("{" + obj2KV[1][0:len(obj2KV[1])])
+ So(obj2.Exists(), ShouldBeTrue)
+ obj2M := obj2.Map()
+ So(len(obj2M), ShouldEqual, 14)
+ So(obj2M["commitOffset"].Int(), ShouldEqual, 1)
+ So(obj2M["cachedMsgMinOffset"].Int(), ShouldEqual, 2)
+ So(obj2M["cachedMsgMaxOffset"].Int(), ShouldEqual, 3)
+ So(obj2M["cachedMsgCount"].Int(), ShouldEqual, 4)
+ So(obj2M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 5)
+ So(obj2M["transactionMsgMinOffset"].Int(), ShouldEqual, 6)
+ So(obj2M["transactionMsgMaxOffset"].Int(), ShouldEqual, 7)
+ So(obj2M["transactionMsgCount"].Int(), ShouldEqual, 8)
+ So(obj2M["locked"].Bool(), ShouldEqual, true)
+ So(obj2M["tryUnlockTimes"].Int(), ShouldEqual, 9)
+ So(obj2M["lastLockTimestamp"].Int(), ShouldEqual, 1574791579221)
+ So(obj2M["dropped"].Bool(), ShouldEqual, false)
+ So(obj2M["lastPullTimestamp"].Int(), ShouldEqual, 1574791579242)
+ So(obj2M["lastConsumeTimestamp"].Int(), ShouldEqual, 1574791579221)
+ })
+ })
+}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c20968..b9f1744 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -52,6 +52,8 @@
FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult
FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error)
+
+ AddrList() []string
}
// namesrvs rocketmq namesrv instance.
@@ -119,3 +121,7 @@
func (s *namesrvs) SetCredentials(credentials primitive.Credentials) {
s.nameSrvClient.RegisterInterceptor(remote.ACLInterceptor(credentials))
}
+
+func (s *namesrvs) AddrList() []string {
+ return s.srvs
+}
diff --git a/internal/request.go b/internal/request.go
index 76d0d7d..470fd35 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -78,10 +78,6 @@
return maps
}
-func (request *SendMessageRequestHeader) Decode(properties map[string]string) error {
- return nil
-}
-
type EndTransactionRequestHeader struct {
ProducerGroup string
TranStateTableOffset int64
@@ -123,23 +119,23 @@
return maps
}
-func (request *CheckTransactionStateRequestHeader) Decode(ext map[string]string) {
- if len(ext) == 0 {
+func (request *CheckTransactionStateRequestHeader) Decode(properties map[string]string) {
+ if len(properties) == 0 {
return
}
- if v, existed := ext["tranStateTableOffset"]; existed {
+ if v, existed := properties["tranStateTableOffset"]; existed {
request.TranStateTableOffset, _ = strconv.ParseInt(v, 10, 0)
}
- if v, existed := ext["commitLogOffset"]; existed {
+ if v, existed := properties["commitLogOffset"]; existed {
request.CommitLogOffset, _ = strconv.ParseInt(v, 10, 0)
}
- if v, existed := ext["msgId"]; existed {
+ if v, existed := properties["msgId"]; existed {
request.MsgId = v
}
- if v, existed := ext["transactionId"]; existed {
+ if v, existed := properties["transactionId"]; existed {
request.MsgId = v
}
- if v, existed := ext["offsetMsgId"]; existed {
+ if v, existed := properties["offsetMsgId"]; existed {
request.MsgId = v
}
}
@@ -273,6 +269,27 @@
return maps
}
-func (request *GetRouteInfoRequestHeader) Decode(properties map[string]string) error {
- return nil
+type GetConsumerRunningInfoHeader struct {
+ consumerGroup string
+ clientID string
+}
+
+func (request *GetConsumerRunningInfoHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["consumerGroup"] = request.consumerGroup
+ maps["clientId"] = request.clientID
+ return maps
+}
+
+func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string) {
+ if len(properties) == 0 {
+ return
+ }
+ if v, existed := properties["consumerGroup"]; existed {
+ request.consumerGroup = v
+ }
+
+ if v, existed := properties["clientId"]; existed {
+ request.clientID = v
+ }
}