[ISSUE #137] split TcpRemotingClient::m_ioService into m_dispatchService and m_handleService

diff --git a/example/CBatchProducer.c b/example/CBatchProducer.c
new file mode 100644
index 0000000..32d6a7a
--- /dev/null
+++ b/example/CBatchProducer.c
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CProducer.h"
+#include "CSendResult.h"
+
+void StartSendMessage(CProducer* producer) {
+  int i = 0;
+  int ret_code = 0;
+  char body[128];
+  CBatchMessage* batchMessage = CreateBatchMessage("T_TestTopic");
+
+  for (i = 0; i < 10; i++) {
+    CMessage* msg = CreateMessage("T_TestTopic");
+    SetMessageTags(msg, "Test_Tag");
+    SetMessageKeys(msg, "Test_Keys");
+    memset(body, 0, sizeof(body));
+    snprintf(body, sizeof(body), "new message body, index %d", i);
+    SetMessageBody(msg, body);
+    AddMessage(batchMessage, msg);
+  }
+  CSendResult result;
+  int ok = SendBatchMessage(producer, batchMessage, &result);
+  printf("SendBatchMessage is %s .....\n", ok == 0 ? "Success" : ok == 11 ? "FAILED" : " It is null value");
+  DestroyBatchMessage(batchMessage);
+}
+
+void CreateProducerAndStartSendMessage() {
+  printf("Producer Initializing.....\n");
+  CProducer* producer = CreateProducer("Group_producer");
+  SetProducerNameServerAddress(producer, "127.0.0.1:9876");
+  StartProducer(producer);
+  printf("Producer start.....\n");
+  StartSendMessage(producer);
+  ShutdownProducer(producer);
+  DestroyProducer(producer);
+  printf("Producer Shutdown!\n");
+}
+
+int main(int argc, char* argv[]) {
+  printf("Send Batch.....\n");
+  CreateProducerAndStartSendMessage();
+  return 0;
+}
diff --git a/include/CBatchMessage.h b/include/CBatchMessage.h
new file mode 100644
index 0000000..6409a55
--- /dev/null
+++ b/include/CBatchMessage.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_BATCHMESSAGE_H__
+#define __C_BATCHMESSAGE_H__
+#include "CCommon.h"
+#include "CMessage.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct CBatchMessage CBatchMessage;
+
+ROCKETMQCLIENT_API CBatchMessage* CreateBatchMessage();
+ROCKETMQCLIENT_API int AddMessage(CBatchMessage* batchMsg, CMessage* msg);
+ROCKETMQCLIENT_API int DestroyBatchMessage(CBatchMessage* batchMsg);
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_BATCHMESSAGE_H__
diff --git a/include/CProducer.h b/include/CProducer.h
index e345e60..6b00575 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -18,6 +18,7 @@
 #ifndef __C_PRODUCER_H__
 #define __C_PRODUCER_H__
 
+#include "CBatchMessage.h"
 #include "CMessage.h"
 #include "CSendResult.h"
 #include "CMQException.h"
@@ -53,6 +54,7 @@
 ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
 
 ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
+ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
 ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer,
                                         CMessage* msg,
                                         CSendSuccessCallback cSendSuccessCallback,
@@ -79,4 +81,4 @@
 #ifdef __cplusplus
 };
 #endif
-#endif  //__C_PRODUCER_H__
+#endif  //__C_PRODUCER_H__
\ No newline at end of file
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 8ec6044..2038f6c 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -430,10 +430,6 @@
   }
 }
 
-void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
-  m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
-}
-
 PullResult* MQClientAPIImpl::pullMessage(const string& addr,
                                          PullMessageRequestHeader* pRequestHeader,
                                          int timeoutMillis,
@@ -467,21 +463,9 @@
                                        void* pArg) {
   //<!delete in future;
   AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
-  MQMessageQueue mq;
-  AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
-  if (pAsyncArg && pAsyncArg->pPullRequest) {
-    mq = pAsyncArg->mq;
-    pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
-    LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),
-              mq.toString().c_str());
-  }
-
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
-    LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(),
-              mq.toString().data());
-    if (pAsyncArg && pAsyncArg->pPullRequest) {
-      pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
-    }
+    LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
+              static_cast<AsyncArg*>(pArg)->mq.toString().data());
     deleteAndZero(cbw);
     THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
   }
@@ -919,4 +903,4 @@
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 1a5e202..a36038d 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -187,7 +187,6 @@
                         int64 timeoutMilliseconds,
                         int maxRetryTimes = 1,
                         int retrySendTimes = 1);
-  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
 
  private:
   SendResult sendMessageSync(const string& addr,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index cfa62dd..38ce229 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -988,18 +988,6 @@
   }
 }
 
-void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
-  // delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
-  if (!pullRequest)
-    return;
-  MQMessageQueue mq = pullRequest->m_messageQueue;
-  int opaque = pullRequest->getLatestPullRequestOpaque();
-  if (opaque > 0) {
-    LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
-    getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
-  }
-}
-
 void MQClientFactory::resetOffset(const string& group,
                                   const string& topic,
                                   const map<MQMessageQueue, int64>& offsetTable) {
@@ -1012,10 +1000,7 @@
       PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
       if (pullreq) {
         pullreq->setDroped(true);
-        LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(),
-                 mq.toString().data());
-        // delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
-        // removeDropedPullRequestOpaque(pullreq);
+        LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
         pullreq->clearAllMsgs();
         pullreq->updateQueueMaxOffset(it->second);
       } else {
@@ -1102,4 +1087,4 @@
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e5d6200..eeb3637 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -108,7 +108,6 @@
   void addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs);
   map<string, map<int, string>> getBrokerAddrMap();
   void clearBrokerAddrMap();
-  void removeDropedPullRequestOpaque(PullRequest* pullRequest);
 
  private:
   void unregisterClient(const string& producerGroup,
diff --git a/src/common/AsyncArg.h b/src/common/AsyncArg.h
index 4e23743..fc358cb 100644
--- a/src/common/AsyncArg.h
+++ b/src/common/AsyncArg.h
@@ -21,7 +21,7 @@
 #include "MQMessageQueue.h"

 #include "PullAPIWrapper.h"

 #include "SubscriptionData.h"

-#include "../consumer/PullRequest.h"

+

 namespace rocketmq {

 //<!***************************************************************************

 

@@ -29,7 +29,6 @@
   MQMessageQueue mq;

   SubscriptionData subData;

   PullAPIWrapper* pPullWrapper;

-  PullRequest* pPullRequest;

 };

 

 //<!***************************************************************************

diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 0db381f..2175feb 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -263,7 +263,6 @@
   arg.mq = mq;
   arg.subData = *pSData;
   arg.pPullWrapper = m_pPullAPIWrapper;
-  arg.pPullRequest = NULL;
 
   try {
     unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq,                      // 1
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index f9e5874..0f51ea1 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -784,7 +784,6 @@
   arg.mq = messageQueue;
   arg.subData = *pSdata;
   arg.pPullWrapper = m_pPullAPIWrapper;
-  arg.pPullRequest = request;
   try {
     request->setLastPullTimestamp(UtilAll::currentTimeMillis());
     m_pPullAPIWrapper->pullKernelImpl(messageQueue,                                 // 1
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 5fd6d58..162be2a 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -28,7 +28,6 @@
       m_queueOffsetMax(0),

       m_bDroped(false),

       m_bLocked(false),

-      m_latestPullRequestOpaque(0),

       m_bPullMsgEventInprogress(false) {}

 

 PullRequest::~PullRequest() {

@@ -46,7 +45,6 @@
     m_messageQueue = other.m_messageQueue;

     m_msgTreeMap = other.m_msgTreeMap;

     m_msgTreeMapTemp = other.m_msgTreeMapTemp;

-    m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;

   }

   return *this;

 }

@@ -272,15 +270,5 @@
   return false;

 }

 

-int PullRequest::getLatestPullRequestOpaque() {

-  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);

-  return m_latestPullRequestOpaque;

-}

-

-void PullRequest::setLatestPullRequestOpaque(int opaque) {

-  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);

-  m_latestPullRequestOpaque = opaque;

-}

-

 //<!***************************************************************************

 }  //<!end namespace;

diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index cef6df3..83b39b8 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -68,8 +68,6 @@
   boost::timed_mutex& getPullRequestCriticalSection();
   void removePullMsgEvent();
   bool addPullMsgEvent();
-  int getLatestPullRequestOpaque();
-  void setLatestPullRequestOpaque(int opaque);
 
  public:
   MQMessageQueue m_messageQueue;
@@ -89,7 +87,6 @@
   // uint64 m_tryUnlockTimes;
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
-  int m_latestPullRequestOpaque;
   boost::timed_mutex m_consumeLock;
   boost::atomic<bool> m_bPullMsgEventInprogress;
 };
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index a2997a7..549e412 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -458,7 +458,7 @@
           it->second->clearAllMsgs();  // add clear operation to avoid bad state
                                        // when dropped pullRequest returns
                                        // normal
-          LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
+          LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
         }
         changed = true;
       }
@@ -634,4 +634,4 @@
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/extern/CBatchMessage.cpp b/src/extern/CBatchMessage.cpp
new file mode 100644
index 0000000..3fb2a97
--- /dev/null
+++ b/src/extern/CBatchMessage.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "MQMessage.h"
+
+using std::vector;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+using namespace rocketmq;
+
+CBatchMessage* CreateBatchMessage() {
+  vector<MQMessage>* msgs = new vector<MQMessage>();
+  return (CBatchMessage*)msgs;
+}
+
+int AddMessage(CBatchMessage* batchMsg, CMessage* msg) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  if (batchMsg == NULL) {
+    return NULL_POINTER;
+  }
+  MQMessage* message = (MQMessage*)msg;
+  ((vector<MQMessage>*)batchMsg)->push_back(*message);
+  return OK;
+}
+int DestroyBatchMessage(CBatchMessage* batchMsg) {
+  if (batchMsg == NULL) {
+    return NULL_POINTER;
+  }
+  delete (vector<MQMessage>*)batchMsg;
+  return OK;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 59ee184..265116e 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -17,6 +17,7 @@
 
 #include "DefaultMQProducer.h"
 #include "AsyncCallback.h"
+#include "CBatchMessage.h"
 #include "CProducer.h"
 #include "CCommon.h"
 #include "CSendResult.h"
@@ -156,6 +157,41 @@
   return OK;
 }
 
+int SendBatchMessage(CProducer* producer, CBatchMessage* batcMsg, CSendResult* result) {
+  // CSendResult sendResult;
+  if (producer == NULL || batcMsg == NULL || result == NULL) {
+    return NULL_POINTER;
+  }
+  try {
+    DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer;
+    vector<MQMessage>* message = (vector<MQMessage>*)batcMsg;
+    SendResult sendResult = defaultMQProducer->send(*message);
+    switch (sendResult.getSendStatus()) {
+      case SEND_OK:
+        result->sendStatus = E_SEND_OK;
+        break;
+      case SEND_FLUSH_DISK_TIMEOUT:
+        result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
+        break;
+      case SEND_FLUSH_SLAVE_TIMEOUT:
+        result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
+        break;
+      case SEND_SLAVE_NOT_AVAILABLE:
+        result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
+        break;
+      default:
+        result->sendStatus = E_SEND_OK;
+        break;
+    }
+    result->offset = sendResult.getQueueOffset();
+    strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
+    result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+  } catch (exception& e) {
+    return PRODUCER_SEND_SYNC_FAILED;
+  }
+  return OK;
+}
+
 int SendMessageAsync(CProducer* producer,
                      CMessage* msg,
                      CSendSuccessCallback cSendSuccessCallback,
@@ -346,4 +382,4 @@
 }
 #ifdef __cplusplus
 };
-#endif
+#endif
\ No newline at end of file
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 4e4b0bb..95ef166 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -73,7 +73,7 @@
   outData["properties"] = properties;

   outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);

   outData["unitMode"] = UtilAll::to_string(unitMode);

-  // outData["batch"] = batch;

+  outData["batch"] = UtilAll::to_string(batch);

 }

 

 int SendMessageRequestHeader::getReconsumeTimes() {

@@ -106,7 +106,7 @@
   requestMap.insert(pair<string, string>("properties", properties));

   requestMap.insert(pair<string, string>("reconsumeTimes", UtilAll::to_string(reconsumeTimes)));

   requestMap.insert(pair<string, string>("unitMode", UtilAll::to_string(unitMode)));

-  // requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));

+  requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));

 }

 

 //<!************************************************************************

diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 5353b05..c114ecd 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -656,21 +656,5 @@
   m_asyncTimerTable.clear();
 }
 
-void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
-  // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
-  // discard when receive it later
-  std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
-  if (!pFuture) {
-    pFuture = findAndDeleteResponseFuture(opaque);
-    if (pFuture) {
-      LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
-    }
-  } else {
-    LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
-    // delete the timeout timer for opaque for pullrequest
-    cancelTimerCallback(opaque);
-  }
-}
-
 //<!************************************************************************
 }  // namespace rocketmq
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index dfacd76..ad73cd2 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -58,8 +58,6 @@
 
   void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
 
-  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
-
  private:
   static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);