fix: orderly pusher consumer may commit queue offset too early
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 1aa0b76..4a79595 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -96,9 +96,7 @@
         }
         pullRequest->setNextOffset(result.nextBeginOffset);
 
-        vector<MQMessageExt> msgs;
-        pullRequest->getMessage(msgs);
-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+        if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
           m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
         }
         if (bProducePullRequest) {
@@ -118,9 +116,7 @@
         }
         pullRequest->setNextOffset(result.nextBeginOffset);
 
-        vector<MQMessageExt> msgs;
-        pullRequest->getMessage(msgs);
-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+        if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
           m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
         }
         if (bProducePullRequest) {
@@ -740,9 +736,7 @@
           break;
         }
         request->setNextOffset(pullResult.nextBeginOffset);
-        vector<MQMessageExt> msgs;
-        request->getMessage(msgs);
-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+        if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
           updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
         }
         producePullMsgTask(request);
@@ -756,9 +750,7 @@
           break;
         }
         request->setNextOffset(pullResult.nextBeginOffset);
-        vector<MQMessageExt> msgs;
-        request->getMessage(msgs);
-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+        if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
           updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
         }
         producePullMsgTask(request);
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 9795880..d86ee7f 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -92,7 +92,7 @@
 

 int PullRequest::getCacheMsgCount() {

   boost::lock_guard<boost::mutex> lock(m_pullRequestLock);

-  return m_msgTreeMap.size();

+  return m_msgTreeMap.size() + m_msgTreeMapTemp.size();

 }

 

 void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset, int64 maxQueueOffset) {