[ISSUE#43] One Consumer Consume One Message Twice at the same time (#61)

Resolve one Consumer consume one message twice at the same time
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 110601d..045bfab 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -451,6 +451,10 @@
   }
 }
 
+void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
+    m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
+}
+
 PullResult* MQClientAPIImpl::pullMessage(
     const string& addr, PullMessageRequestHeader* pRequestHeader,
     int timeoutMillis, int communicationMode, PullCallback* pullCallback,
@@ -480,9 +484,21 @@
                                        PullCallback* pullCallback, void* pArg) {
   //<!delete in future;
   AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+  MQMessageQueue mq;
+  AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
+  if (pAsyncArg && pAsyncArg->pPullRequest) {
+    mq = pAsyncArg->mq;
+    pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
+    LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", 
+        pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),mq.toString().c_str());
+  }
+
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) ==
       false) {
-    LOG_ERROR("pullMessageAsync failed of addr:%s", addr.c_str());
+    LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(), mq.toString().data());
+    if (pAsyncArg && pAsyncArg->pPullRequest) {
+        pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
+    }
     deleteAndZero(cbw);
     THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
   }
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9f3ebb4..f2ac273 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,6 +167,8 @@
                         SendCallback* pSendCallback, int64 timeoutMilliseconds,
                         int maxRetryTimes=1,
                         int retrySendTimes=1);
+  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
+
  private:
   SendResult sendMessageSync(const string& addr, const string& brokerName,
                              const MQMessage& msg, RemotingCommand& request,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index e128077..606ccc1 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -1040,6 +1040,17 @@
   }
 }
 
+void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
+  //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+  if (!pullRequest) return;
+  MQMessageQueue mq = pullRequest->m_messageQueue;
+  int opaque = pullRequest->getLatestPullRequestOpaque();
+  if (opaque > 0) {
+      LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+      getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
+  }
+}
+
 void MQClientFactory::resetOffset(
     const string& group, const string& topic,
     const map<MQMessageQueue, int64>& offsetTable) {
@@ -1052,6 +1063,9 @@
       PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
       if (pullreq) {
         pullreq->setDroped(true);
+        LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(), mq.toString().data());
+        //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+        removeDropedPullRequestOpaque(pullreq);
         pullreq->clearAllMsgs();
         pullreq->updateQueueMaxOffset(it->second);
       } else {
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index 34ac2a9..7da2e21 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -113,7 +113,7 @@
                           map<int, string>& brokerAddrs);
   map<string, map<int, string>> getBrokerAddrMap();
   void clearBrokerAddrMap();
-
+  void removeDropedPullRequestOpaque(PullRequest* pullRequest);
  private:
   void unregisterClient(const string& producerGroup,
                         const string& consumerGroup,
diff --git a/src/common/AsyncArg.h b/src/common/AsyncArg.h
index fc358cb..4e23743 100755
--- a/src/common/AsyncArg.h
+++ b/src/common/AsyncArg.h
@@ -21,7 +21,7 @@
 #include "MQMessageQueue.h"

 #include "PullAPIWrapper.h"

 #include "SubscriptionData.h"

-

+#include "../consumer/PullRequest.h"

 namespace rocketmq {

 //<!***************************************************************************

 

@@ -29,6 +29,7 @@
   MQMessageQueue mq;

   SubscriptionData subData;

   PullAPIWrapper* pPullWrapper;

+  PullRequest* pPullRequest;

 };

 

 //<!***************************************************************************

diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 073a801..bfc4e9d 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -263,6 +263,7 @@
   arg.mq = mq;
   arg.subData = *pSData;
   arg.pPullWrapper = m_pPullAPIWrapper;
+  arg.pPullRequest = NULL;
 
   try {
     unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 2e78ebc..11e16ed 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -840,7 +840,7 @@
         arg.mq = messageQueue;
         arg.subData = *pSdata;
         arg.pPullWrapper = m_pPullAPIWrapper;
-
+        arg.pPullRequest = request;
         try {
             request->setLastPullTimestamp(UtilAll::currentTimeMillis());
             m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index bb578f8..8510e43 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -28,7 +28,8 @@
       m_queueOffsetMax(0),

       m_bDroped(false),

       m_bLocked(false),

-      m_bPullMsgEventInprogress(false) {}

+      m_bPullMsgEventInprogress(false),

+      m_latestPullRequestOpaque(0) {}

 

 PullRequest::~PullRequest() {

   m_msgTreeMapTemp.clear();

@@ -45,6 +46,7 @@
     m_messageQueue = other.m_messageQueue;

     m_msgTreeMap = other.m_msgTreeMap;

     m_msgTreeMapTemp = other.m_msgTreeMapTemp;

+    m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;

   }

   return *this;

 }

@@ -260,5 +262,15 @@
   return false;

 }

 

+int PullRequest::getLatestPullRequestOpaque() const {

+    boost::lock_guard<boost::mutex> lock(m_pullRequestLock);

+    return m_latestPullRequestOpaque;

+}

+

+void PullRequest::setLatestPullRequestOpaque(int opaque) {

+    boost::lock_guard<boost::mutex> lock(m_pullRequestLock);

+    m_latestPullRequestOpaque = opaque;

+}

+

 //<!***************************************************************************

 }  //<!end namespace;

diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index f7abfaf..80c286a 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -69,6 +69,8 @@
   boost::timed_mutex& getPullRequestCriticalSection();
   void removePullMsgEvent();
   bool addPullMsgEvent();
+  int getLatestPullRequestOpaque() const;
+  void setLatestPullRequestOpaque(int opaque);
 
  public:
   MQMessageQueue m_messageQueue;
@@ -88,6 +90,7 @@
   //uint64 m_tryUnlockTimes;
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
+  int    m_latestPullRequestOpaque;
   boost::timed_mutex m_consumeLock;
   boost::atomic<bool> m_bPullMsgEventInprogress;
 };
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 4a35673..f23b727 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -471,11 +471,13 @@
           (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
         if (!(it->second->isDroped())) {
           it->second->setDroped(true);
+          //delete the lastest pull request for this mq, which hasn't been response
+          m_pClientFactory->removeDropedPullRequestOpaque(it->second);
           removeUnnecessaryMessageQueue(mqtemp);
           it->second->clearAllMsgs();  // add clear operation to avoid bad state
                                        // when dropped pullRequest returns
                                        // normal
-          LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
+          LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
         }
         changed = true;
       }
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 551df24..603c17f 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -592,7 +592,7 @@
     if (!pfuture->getAsyncResponseFlag()) {

       pfuture->setAsyncResponseFlag();

       pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);

-	  cancelTimerCallback(opaque);

+      cancelTimerCallback(opaque);

       pfuture->executeInvokeCallback();	  

     }

   }

@@ -707,7 +707,7 @@
 void TcpRemotingClient::eraseTimerCallback(int opaque) {

   boost::lock_guard<boost::mutex> lock(m_timerMapMutex);

   if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {

-  	LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);

+    LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);

     boost::asio::deadline_timer* t = m_async_timer_map[opaque];

     delete t;

     t = NULL;

@@ -739,5 +739,20 @@
   m_async_timer_map.clear();

 }

 

+void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {

+  //delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it later

+  boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));

+  if (!pFuture) {

+    pFuture = findAndDeleteResponseFuture(opaque);

+    if (pFuture) {

+      LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());

+    }

+  } else {

+    LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); 

+  }

+  //delete the timeout timer for opaque for pullrequest

+  cancelTimerCallback(opaque);

+}

+

 //<!************************************************************************

 }  //<!end namespace;

diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index fcdd8a8..c338d53 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -58,6 +58,7 @@
   void boost_asio_work();

   void handleAsyncPullForResponseTimeout(const boost::system::error_code& e,

                                          int opaque);

+  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);

 

  private:

   static void static_messageReceived(void* context, const MemoryBlock& mem,