fix(consumer): send back error when consuming failed. (#210)

diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 33765cd..899862b 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -43,7 +43,7 @@
   //<!end mqadmin;

 

   //<!begin MQConsumer

-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);

+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName);

   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs);

   virtual void doRebalance();

   virtual void persistConsumerOffset();

diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index b29ca72..43f0fbb 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -55,7 +55,7 @@
   //<!end mqadmin;

 

   //<!begin MQConsumer

-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);

+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName);

   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs);

   virtual void doRebalance();

   virtual void persistConsumerOffset();

diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index 54b535a..87e2c1b 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -32,7 +32,7 @@
 class ROCKETMQCLIENT_API MQConsumer : public MQClient {

  public:

   virtual ~MQConsumer() {}

-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;

+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName) = 0;

   virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) = 0;

   virtual void doRebalance() = 0;

   virtual void persistConsumerOffset() = 0;

diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index f890968..0877a03 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -823,7 +823,8 @@
   m_pRemotingClient->invokeOneway(addr, request);
 }
 
-void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& msg,
+void MQClientAPIImpl::consumerSendMessageBack(const string addr,
+                                              MQMessageExt& msg,
                                               const string& consumerGroup,
                                               int delayLevel,
                                               int timeoutMillis,
@@ -833,7 +834,7 @@
   pRequestHeader->offset = msg.getCommitLogOffset();
   pRequestHeader->delayLevel = delayLevel;
 
-  string addr = socketAddress2IPPort(msg.getStoreHost());
+  // string addr = socketAddress2IPPort(msg.getStoreHost());
   RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
   callSignatureBeforeRequest(addr, request, sessionCredentials);
   request.Encode();
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 763e45d..9555d72 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,7 +167,8 @@
                                   int timeoutMillis,
                                   const SessionCredentials& sessionCredentials);
 
-  void consumerSendMessageBack(MQMessageExt& msg,
+  void consumerSendMessageBack(const string addr,
+                               MQMessageExt& msg,
                                const string& consumerGroup,
                                int delayLevel,
                                int timeoutMillis,
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index afdc5fa..068b8c4 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -53,7 +53,8 @@
 string MQClient::getMQClientId() const {
   string clientIP = UtilAll::getLocalAddress();
   string processId = UtilAll::to_string(getpid());
-  return processId + "-" + clientIP + "@" + m_instanceName;
+  // return processId + "-" + clientIP + "@" + m_instanceName;
+  return clientIP + "@" + processId + "#" + m_instanceName;
 }
 
 //<!groupName;
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 93cdcc3..371faa2 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -189,14 +189,18 @@
     case CLUSTERING: {

       // send back msg to broker;

       for (size_t i = ackIndex + 1; i < msgs.size(); i++) {

-        LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume times is:%d",

-                 (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,

-                 msgs[i].getReconsumeTimes());

-        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY && !m_pConsumer->sendMessageBack(msgs[i], 0)) {

-          LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d",

-                   (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,

-                   msgs[i].getReconsumeTimes());

-          localRetryMsgs.push_back(msgs[i]);

+        LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume times is:%d",

+                  (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,

+                  msgs[i].getReconsumeTimes());

+        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {

+          string brokerName = request->m_messageQueue.getBrokerName();

+          if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {

+            LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d",

+                     (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,

+                     msgs[i].getReconsumeTimes());

+            msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1);

+            localRetryMsgs.push_back(msgs[i]);

+          }

         }

       }

       break;

diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index c8aac49..1f58e86 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -142,7 +142,7 @@
   }
 }
 
-bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, string& brokerName) {
   return true;
 }
 
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 376c3e9..df77cac 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -251,9 +251,14 @@
   m_subTopics.clear();
 }
 
-bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
+bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, string& brokerName) {
+  string brokerAddr;
+  if (!brokerName.empty())
+    brokerAddr = getFactory()->findBrokerAddressInPublish(brokerName);
+  else
+    brokerAddr = socketAddress2IPPort(msg.getStoreHost());
   try {
-    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg, getGroupName(), delayLevel, 3000,
+    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel, 3000,
                                                                 getSessionCredentials());
   } catch (MQException& e) {
     LOG_ERROR(e.what());
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 0a4c06f..5546b61 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -128,7 +128,7 @@
             std::stringstream ss;
             ss << "Allocation result for [Consumer Group: " << m_pConsumer->getGroupName() << ", Topic: " << topic
                << ", Current Consumer ID: " << m_pConsumer->getMQClientId() << "] is changed.\n "
-               << "Total Queue #: " << mqAll.size() << ", Total Consumer #: " << cidAll.size()
+               << "Total Queue :#" << mqAll.size() << ", Total Consumer :#" << cidAll.size()
                << " Allocated Queues are: \n";
 
             for (vector<MQMessageQueue>::size_type i = 0; i < allocateResult.size(); ++i) {