feat(internal): support reset consumer offset (#682)
Co-authored-by: zhangxu16 <zhangxu16@xiaomi.com>
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 0e9d8ec..76a9236 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -343,6 +343,7 @@
func (pq *processQueue) clear() {
pq.mutex.Lock()
+ defer pq.mutex.Unlock()
pq.msgCache.Clear()
pq.cachedMsgCount = 0
pq.cachedMsgSize = 0
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index ec44a1e..c84ce84 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -831,7 +831,7 @@
rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
}
-func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQueue]int64) {
+func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) {
//topic := cmd.ExtFields["topic"]
//group := cmd.ExtFields["group"]
//if topic == "" || group == "" {
@@ -857,11 +857,13 @@
// rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group)
// return
//}
+ pc.suspend()
+ defer pc.resume()
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
- if _, ok := table[mq]; !ok {
+ if _, ok := table[mq]; ok && mq.Topic == topic {
pq.WithDropped(true)
pq.clear()
}
@@ -872,16 +874,17 @@
if !exist {
return
}
- queuesOfTopic := v.([]primitive.MessageQueue)
+ queuesOfTopic := v.([]*primitive.MessageQueue)
for _, k := range queuesOfTopic {
- if _, ok := table[k]; ok {
- pc.storage.update(&k, table[k], false)
+ if _, ok := table[*k]; ok {
+ pc.storage.update(k, table[*k], false)
v, exist := pc.processQueueTable.Load(k)
if !exist {
continue
}
pq := v.(*processQueue)
- pc.removeUnnecessaryMessageQueue(&k, pq)
+ pc.removeUnnecessaryMessageQueue(k, pq)
+ pc.processQueueTable.Delete(k)
}
}
}
diff --git a/internal/client.go b/internal/client.go
index 6e665ea..3a09ea8 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -88,6 +88,7 @@
GetcType() string
GetModel() string
GetWhere() string
+ ResetOffset(topic string, table map[primitive.MessageQueue]int64)
}
func DefaultClientOptions() ClientOptions {
@@ -283,6 +284,23 @@
}
return res
})
+
+ client.remoteClient.RegisterRequestFunc(ReqResetConsumerOffset, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+ rlog.Info("receive reset consumer offset request...", map[string]interface{}{
+ rlog.LogKeyBroker: addr.String(),
+ rlog.LogKeyTopic: req.ExtFields["topic"],
+ rlog.LogKeyConsumerGroup: req.ExtFields["group"],
+ rlog.LogKeyTimeStamp: req.ExtFields["timestamp"],
+ })
+ header := new(ResetOffsetHeader)
+ header.Decode(req.ExtFields)
+
+ body := new(ResetOffsetBody)
+ body.Decode(req.Body)
+
+ client.resetOffset(header.topic, header.group, body.OffsetTable)
+ return nil
+ })
}
return actual.(*rmqClient)
}
@@ -777,6 +795,15 @@
return result
}
+func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[primitive.MessageQueue]int64) {
+ consumer, exist := c.consumerMap.Load(group)
+ if !exist {
+ rlog.Warning("group "+group+" do not exists", nil)
+ return
+ }
+ consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
+}
+
func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
consumer, exist := c.consumerMap.Load(group)
if !exist {
diff --git a/internal/model.go b/internal/model.go
index 934c610..d7f2057 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -21,7 +21,9 @@
"bytes"
"encoding/json"
"fmt"
+ "github.com/tidwall/gjson"
"sort"
+ "strconv"
"strings"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
@@ -288,3 +290,48 @@
}
return data, nil
}
+
+type ResetOffsetBody struct {
+ OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
+}
+
+func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
+ result := gjson.ParseBytes(body)
+ rlog.Debug("offset table string "+result.Get("offsetTable").String(), nil)
+
+ offsetTable := make(map[primitive.MessageQueue]int64, 0)
+ offsetTableArray := strings.Split(result.Get("offsetTable").String(), "],[")
+ for index, v := range offsetTableArray {
+ kvArray := strings.Split(v, "},")
+
+ var kstr, vstr string
+ if index == len(offsetTableArray)-1 {
+ vstr = kvArray[1][:len(kvArray[1])-2]
+ } else {
+ vstr = kvArray[1]
+ }
+ offset, err := strconv.ParseInt(vstr, 10, 64)
+ if err != nil {
+ rlog.Error("Unmarshal offset error", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
+ return
+ }
+
+ if index == 0 {
+ kstr = kvArray[0][2:len(kvArray[0])] + "}"
+ } else {
+ kstr = kvArray[0] + "}"
+ }
+ kObj := new(primitive.MessageQueue)
+ err = jsoniter.Unmarshal([]byte(kstr), &kObj)
+ if err != nil {
+ rlog.Error("Unmarshal message queue error", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
+ return
+ }
+ offsetTable[*kObj] = offset
+ }
+ resetOffsetBody.OffsetTable = offsetTable
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 57ff0af..56eeb24 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -402,5 +402,21 @@
fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
})
})
+}
+func TestRestOffsetBody_MarshalJSON(t *testing.T) {
+ Convey("test ResetOffset Body Decode", t, func() {
+ body := "{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":1},23373517],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":0},23373350],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":3},23373424],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":2},23373382]]}"
+ resetOffsetBody := new(ResetOffsetBody)
+ resetOffsetBody.Decode([]byte(body))
+ offsetTable := resetOffsetBody.OffsetTable
+ So(offsetTable, ShouldNotBeNil)
+ So(len(offsetTable), ShouldEqual, 8)
+ messageQueue := primitive.MessageQueue{
+ Topic: "zx_tst",
+ BrokerName: "tjwqtst-common-rocketmq-raft0",
+ QueueId: 5,
+ }
+ So(offsetTable[messageQueue], ShouldEqual, 23354233)
+ })
}
diff --git a/internal/request.go b/internal/request.go
index 237d711..0e3d8e1 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -48,7 +48,7 @@
ReqGetAllTopicListFromNameServer = int16(206)
ReqDeleteTopicInBroker = int16(215)
ReqDeleteTopicInNameSrv = int16(216)
- ReqResetConsuemrOffset = int16(220)
+ ReqResetConsumerOffset = int16(220)
ReqGetConsumerRunningInfo = int16(307)
ReqConsumeMessageDirectly = int16(309)
)
@@ -408,6 +408,39 @@
return maps
}
+type ResetOffsetHeader struct {
+ topic string
+ group string
+ timestamp int64
+ isForce bool
+}
+
+func (request *ResetOffsetHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["topic"] = request.topic
+ maps["group"] = request.group
+ maps["timestamp"] = strconv.FormatInt(request.timestamp, 10)
+ return maps
+}
+
+func (request *ResetOffsetHeader) Decode(properties map[string]string) {
+ if len(properties) == 0 {
+ return
+ }
+
+ if v, existed := properties["topic"]; existed {
+ request.topic = v
+ }
+
+ if v, existed := properties["group"]; existed {
+ request.group = v
+ }
+
+ if v, existed := properties["timestamp"]; existed {
+ request.timestamp, _ = strconv.ParseInt(v, 10, 0)
+ }
+}
+
type ConsumeMessageDirectlyHeader struct {
consumerGroup string
clientID string
diff --git a/rlog/log.go b/rlog/log.go
index 1d850c3..382f5aa 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -33,6 +33,7 @@
LogKeyValueChangedFrom = "changedFrom"
LogKeyValueChangedTo = "changeTo"
LogKeyPullRequest = "PullRequest"
+ LogKeyTimeStamp = "timestamp"
)
type Logger interface {