fix(reconsume): subMsgs should be used instead of msgs in consume goroutine (#504)
* fix(consume): subMsgs should be used instead of msgs in consuming goroutine
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d2aee5b..25b8ce8 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -913,7 +913,7 @@
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
- Msgs: msgs,
+ Msgs: subMsgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -944,14 +944,14 @@
} else {
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
- for i := 0; i < len(msgs); i++ {
+ for i := 0; i < len(subMsgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": subMsgs[i],
})
}
} else {
- for i := 0; i < len(msgs); i++ {
- msg := msgs[i]
+ for i := 0; i < len(subMsgs); i++ {
+ msg := subMsgs[i]
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
@@ -973,7 +973,7 @@
} else {
rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
- "message": msgs,
+ "message": subMsgs,
})
}
})