fix: orderly pusher consumer may commit queue offset too early
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 1aa0b76..4a79595 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -96,9 +96,7 @@
}
pullRequest->setNextOffset(result.nextBeginOffset);
- vector<MQMessageExt> msgs;
- pullRequest->getMessage(msgs);
- if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest) {
@@ -118,9 +116,7 @@
}
pullRequest->setNextOffset(result.nextBeginOffset);
- vector<MQMessageExt> msgs;
- pullRequest->getMessage(msgs);
- if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest) {
@@ -740,9 +736,7 @@
break;
}
request->setNextOffset(pullResult.nextBeginOffset);
- vector<MQMessageExt> msgs;
- request->getMessage(msgs);
- if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
@@ -756,9 +750,7 @@
break;
}
request->setNextOffset(pullResult.nextBeginOffset);
- vector<MQMessageExt> msgs;
- request->getMessage(msgs);
- if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 9795880..d86ee7f 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -92,7 +92,7 @@
int PullRequest::getCacheMsgCount() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
- return m_msgTreeMap.size();
+ return m_msgTreeMap.size() + m_msgTreeMapTemp.size();
}
void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset, int64 maxQueueOffset) {