Merge pull request #53 from ShannonDing/format

[ISSUE #45]Fix unsafe code ,build warnings and format code style
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3a21ab4..e9d5ca2 100755
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -139,6 +139,8 @@
+ -Wno-unused-local-typedef
+ -Wno-expansion-to-defined
  # -finline-limit=1000
  # -Wextra
  # -pedantic
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 25799b6..02f5651 100755
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -100,7 +100,7 @@
   int m_sendMsgTimeout;

   int m_compressMsgBodyOverHowmuch;

   int m_maxMessageSize;  //<! default:128K;

-  bool m_retryAnotherBrokerWhenNotStoreOK;

+  //bool m_retryAnotherBrokerWhenNotStoreOK;

   int m_compressLevel;

   int m_retryTimes;

   int m_retryTimes4Async;  

diff --git a/src/common/sync_http_client.h b/src/common/sync_http_client.h
index b25cc77..9836605 100755
--- a/src/common/sync_http_client.h
+++ b/src/common/sync_http_client.h
@@ -15,7 +15,7 @@
  * limitations under the License.
 #include <string>
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 4aa33f6..073a801 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -58,7 +58,8 @@
 void DefaultMQPullConsumer::start() {
 #ifndef WIN32
   /* Ignore the SIGPIPE */
-  struct sigaction sa = {0};
+  struct sigaction sa;
+  memset(&sa,0, sizeof(struct sigaction));
   sa.sa_handler = SIG_IGN;
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 96e5ec2..159e0ff 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -1,946 +1,963 @@

- * Licensed to the Apache Software Foundation (ASF) under one or more

- * contributor license agreements.  See the NOTICE file distributed with

- * this work for additional information regarding copyright ownership.

- * The ASF licenses this file to You under the Apache License, Version 2.0

- * (the "License"); you may not use this file except in compliance with

- * the License.  You may obtain a copy of the License at

- *

- *

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */


-#include "DefaultMQPushConsumer.h"

-#include "CommunicationMode.h"

-#include "ConsumeMsgService.h"

-#include "ConsumerRunningInfo.h"

-#include "FilterAPI.h"

-#include "Logging.h"

-#include "MQClientAPIImpl.h"

-#include "MQClientFactory.h"

-#include "MQClientManager.h"

-#include "MQProtos.h"

-#include "OffsetStore.h"

-#include "PullAPIWrapper.h"

-#include "PullSysFlag.h"

-#include "Rebalance.h"

-#include "UtilAll.h"

-#include "Validators.h"

-#include "task_queue.h"


-namespace rocketmq {


-class AsyncPullCallback : public PullCallback {

- public:

-  AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, PullRequest* request)

-      : m_callbackOwner(pushConsumer),

-        m_pullRequest(request),

-        m_bShutdown(false) {}

-  virtual ~AsyncPullCallback() {

-    m_callbackOwner = NULL;

-    m_pullRequest = NULL;

-  }

-  virtual void onSuccess(MQMessageQueue& mq, PullResult& result,

-                         bool bProducePullRequest) {

-    if (m_bShutdown == true) {

-      LOG_INFO("pullrequest for:%s in shutdown, return",

-               (m_pullRequest->m_messageQueue).toString().c_str());

-      m_pullRequest->removePullMsgEvent();

-      return;

-    }


-    switch (result.pullStatus) {

-      case FOUND: {

-        if (!m_pullRequest->isDroped())  // if request is setted to dropped,

-                                         // don't add msgFoundList to

-                                         // m_msgTreeMap and don't call

-                                         // producePullMsgTask

-        {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently

-           // and this request is dropped, and then received pulled msgs.

-          m_pullRequest->setNextOffset(result.nextBeginOffset);

-          m_pullRequest->putMessage(result.msgFoundList);


-          m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(

-              m_pullRequest, result.msgFoundList);


-          if (bProducePullRequest)

-            m_callbackOwner->producePullMsgTask(m_pullRequest);

-          else

-            m_pullRequest->removePullMsgEvent();


-          LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",

-                    (m_pullRequest->m_messageQueue).toString().c_str(),

-                    result.msgFoundList.size(), result.nextBeginOffset);

-        } else {

-          LOG_INFO("remove pullmsg event of mq:%s",

-                   (m_pullRequest->m_messageQueue).toString().c_str());

-          m_pullRequest->removePullMsgEvent();

-        }

-        break;

-      }

-      case NO_NEW_MSG: {

-        m_pullRequest->setNextOffset(result.nextBeginOffset);


-        vector<MQMessageExt> msgs;

-        m_pullRequest->getMessage(msgs);

-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {

-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset

-          is kept, then consumer will enter following situation:

-          1>. get pull offset with 0 when do rebalance, and set

-          m_offsetTable[mq] to 0;

-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin

-          offset increase by 800

-          3>. request->getMessage(msgs) always NULL

-          4>. we need update consumerOffset to nextBeginOffset indicated by

-          broker

-          but if really no new msg could be pulled, also go to this CASE


-          LOG_INFO("maybe misMatch between broker and client happens, update

-          consumerOffset to nextBeginOffset indicated by broker");*/

-          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,

-                                               result.nextBeginOffset);

-        }

-        if (bProducePullRequest)

-          m_callbackOwner->producePullMsgTask(m_pullRequest);

-        else

-          m_pullRequest->removePullMsgEvent();


-        /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",

-                 (m_pullRequest->m_messageQueue).toString().c_str(),

-                 result.nextBeginOffset);*/

-        break;

-      }

-      case NO_MATCHED_MSG: {

-        m_pullRequest->setNextOffset(result.nextBeginOffset);


-        vector<MQMessageExt> msgs;

-        m_pullRequest->getMessage(msgs);

-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {

-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset

-          is kept, then consumer will enter following situation:

-          1>. get pull offset with 0 when do rebalance, and set

-          m_offsetTable[mq] to 0;

-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin

-          offset increase by 800

-          3>. request->getMessage(msgs) always NULL

-          4>. we need update consumerOffset to nextBeginOffset indicated by

-          broker

-          but if really no new msg could be pulled, also go to this CASE


-          LOG_INFO("maybe misMatch between broker and client happens, update

-          consumerOffset to nextBeginOffset indicated by broker");*/

-          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,

-                                               result.nextBeginOffset);

-        }

-        if (bProducePullRequest)

-          m_callbackOwner->producePullMsgTask(m_pullRequest);

-        else

-          m_pullRequest->removePullMsgEvent();

-        /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",

-                 (m_pullRequest->m_messageQueue).toString().c_str(),

-                 result.nextBeginOffset);*/

-        break;

-      }

-      case OFFSET_ILLEGAL: {

-        m_pullRequest->setNextOffset(result.nextBeginOffset);

-        if (bProducePullRequest)

-          m_callbackOwner->producePullMsgTask(m_pullRequest);

-        else

-          m_pullRequest->removePullMsgEvent();


-        /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",

-                 (m_pullRequest->m_messageQueue).toString().c_str(),

-                 result.nextBeginOffset);*/

-        break;

-      }

-      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker

-                              // will not returns this status, so this case

-                              // could not be entered.

-        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");

-        m_pullRequest->setNextOffset(result.nextBeginOffset);

-        if (bProducePullRequest)

-          m_callbackOwner->producePullMsgTask(m_pullRequest);

-        else

-          m_pullRequest->removePullMsgEvent();

-        break;

-      }

-    }

-  }


-  virtual void onException(MQException& e) {

-    if (m_bShutdown == true) {

-      LOG_INFO("pullrequest for:%s in shutdown, return",

-               (m_pullRequest->m_messageQueue).toString().c_str());

-      m_pullRequest->removePullMsgEvent();

-      return;

-    }

-    LOG_WARN("pullrequest for:%s occurs exception, reproduce it",

-             (m_pullRequest->m_messageQueue).toString().c_str());

-    m_callbackOwner->producePullMsgTask(m_pullRequest);

-  }


-  void setShutdownStatus() { m_bShutdown = true; }


- private:

-  DefaultMQPushConsumer* m_callbackOwner;

-  PullRequest* m_pullRequest;

-  bool m_bShutdown;




-static boost::mutex m_asyncCallbackLock;

-DefaultMQPushConsumer::DefaultMQPushConsumer(const string& groupname)

-    : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),

-      m_pOffsetStore(NULL),

-      m_pPullAPIWrapper(NULL),

-      m_pMessageListener(NULL),

-      m_consumeMessageBatchMaxSize(1),

-      m_maxMsgCacheSize(1000) {

-  //<!set default group name;

-  string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;

-  setGroupName(gname);

-  m_asyncPull = true;

-  m_asyncPullTimeout = 30 * 1000;

-  setMessageModel(CLUSTERING);


-  m_startTime = UtilAll::currentTimeMillis();

-  m_consumeThreadCount = boost::thread::hardware_concurrency();

-  m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();

-  m_async_service_thread.reset(new boost::thread(

-      boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));



-void DefaultMQPushConsumer::boost_asio_work() {

-  LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");

-  boost::asio::io_service::work work(m_async_ioService);  // avoid async io

-                                                          // service stops after

-                                                          // first timer timeout

-                                                          // callback




-DefaultMQPushConsumer::~DefaultMQPushConsumer() {

-  m_pMessageListener = NULL;

-  if (m_pullmsgQueue != NULL) {

-    deleteAndZero(m_pullmsgQueue);

-  }

-  if (m_pRebalance != NULL) {

-    deleteAndZero(m_pRebalance);

-  }

-  if (m_pOffsetStore != NULL) {

-    deleteAndZero(m_pOffsetStore);

-  }

-  if (m_pPullAPIWrapper != NULL) {

-    deleteAndZero(m_pPullAPIWrapper);

-  }

-  if (m_consumerService != NULL) {

-    deleteAndZero(m_consumerService);

-  }

-  PullMAP::iterator it = m_PullCallback.begin();

-  for (; it != m_PullCallback.end(); ++it) {

-    deleteAndZero(it->second);

-  }

-  m_PullCallback.clear();

-  m_subTopics.clear();



-void DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {

-  try {

-    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(

-        msg, getGroupName(), delayLevel, 3000, getSessionCredentials());

-  } catch (MQException& e) {

-    LOG_ERROR(e.what());

-  }



-void DefaultMQPushConsumer::fetchSubscribeMessageQueues(

-    const string& topic, vector<MQMessageQueue>& mqs) {

-  mqs.clear();

-  try {

-    getFactory()->fetchSubscribeMessageQueues(topic, mqs,

-                                              getSessionCredentials());

-  } catch (MQException& e) {

-    LOG_ERROR(e.what());

-  }



-void DefaultMQPushConsumer::doRebalance() {

-  if (isServiceStateOk()) {

-    try {

-      m_pRebalance->doRebalance();

-    } catch (MQException& e) {

-      LOG_ERROR(e.what());

-    }

-  }



-void DefaultMQPushConsumer::persistConsumerOffset() {

-  if (isServiceStateOk()) {

-    m_pRebalance->persistConsumerOffset();

-  }



-void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {

-  if (isServiceStateOk()) {

-    m_pRebalance->persistConsumerOffsetByResetOffset();

-  }



-void DefaultMQPushConsumer::start() {

-#ifndef WIN32

-  /* Ignore the SIGPIPE */

-  struct sigaction sa ={0};

-  sa.sa_handler = SIG_IGN;

-  sa.sa_flags = 0;

-  sigaction(SIGPIPE, &sa, 0);


-  switch (m_serviceState) {

-    case CREATE_JUST: {

-      m_serviceState = START_FAILED;

-      MQClient::start();

-      LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());


-      //<!data;

-      checkConfig();


-      //<!create rebalance;

-      m_pRebalance = new RebalancePush(this, getFactory());


-      string groupname = getGroupName();

-      m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);


-      if (m_pMessageListener) {

-        if (m_pMessageListener->getMessageListenerType() ==

-            messageListenerOrderly) {

-          LOG_INFO("start orderly consume service:%s", getGroupName().c_str());

-          m_consumerService = new ConsumeMessageOrderlyService(

-              this, m_consumeThreadCount, m_pMessageListener);

-        } else  // for backward compatible, defaultly and concurrently listeners

-                // are allocating ConsumeMessageConcurrentlyService

-        {

-          LOG_INFO("start concurrently consume service:%s",

-                   getGroupName().c_str());

-          m_consumerService = new ConsumeMessageConcurrentlyService(

-              this, m_consumeThreadCount, m_pMessageListener);

-        }

-      }


-      m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);

-      m_pullmsgThread.reset(new boost::thread(boost::bind(

-          &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));


-      copySubscription();


-      //<! registe;

-      bool registerOK = getFactory()->registerConsumer(this);

-      if (!registerOK) {

-        m_serviceState = CREATE_JUST;


-            MQClientException,

-            "The cousumer group[" + getGroupName() +

-                "] has been created before, specify another name please.",

-            -1);

-      }


-      //<!msg model;

-      switch (getMessageModel()) {

-        case BROADCASTING:

-          m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());

-          break;

-        case CLUSTERING:

-          m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());

-          break;

-      }

-      bool bStartFailed = false;

-      string errorMsg;

-      try {

-        m_pOffsetStore->load();

-      } catch (MQClientException& e) {

-        bStartFailed = true;

-        errorMsg = std::string(e.what());

-      }

-      m_consumerService->start();


-      getFactory()->start();


-      updateTopicSubscribeInfoWhenSubscriptionChanged();

-      getFactory()->sendHeartbeatToAllBroker();


-      m_serviceState = RUNNING;

-      if (bStartFailed) {

-        shutdown();

-        THROW_MQEXCEPTION(MQClientException, errorMsg, -1);

-      }

-      break;

-    }

-    case RUNNING:

-    case START_FAILED:


-      break;

-    default:

-      break;

-  }


-  getFactory()->rebalanceImmediately();



-void DefaultMQPushConsumer::shutdown() {

-  switch (m_serviceState) {

-    case RUNNING: {

-      LOG_INFO("DefaultMQPushConsumer shutdown");

-      m_async_ioService.stop();

-      m_async_service_thread->interrupt();

-      m_async_service_thread->join();

-      m_pullmsgQueue->close();

-      m_pullmsgThread->interrupt();

-      m_pullmsgThread->join();

-      m_consumerService->shutdown();

-      persistConsumerOffset();

-      shutdownAsyncPullCallBack();  // delete aync pullMsg resources

-      getFactory()->unregisterConsumer(this);

-      getFactory()->shutdown();

-      m_serviceState = SHUTDOWN_ALREADY;

-      break;

-    }

-    case CREATE_JUST:


-      break;

-    default:

-      break;

-  }



-void DefaultMQPushConsumer::registerMessageListener(

-    MQMessageListener* pMessageListener) {

-  if (NULL != pMessageListener) {

-    m_pMessageListener = pMessageListener;

-  }



-MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {

-  if (NULL != m_pMessageListener) {

-    return m_pMessageListener->getMessageListenerType();

-  }

-  return messageListenerDefaultly;



-ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const {

-  return m_consumerService;



-OffsetStore* DefaultMQPushConsumer::getOffsetStore() const {

-  return m_pOffsetStore;



-Rebalance* DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }


-void DefaultMQPushConsumer::subscribe(const string& topic,

-                                      const string& subExpression) {

-  m_subTopics[topic] = subExpression;



-void DefaultMQPushConsumer::checkConfig() {

-  string groupname = getGroupName();

-  // check consumerGroup

-  Validators::checkGroup(groupname);


-  // consumerGroup

-  if (! {

-    THROW_MQEXCEPTION(MQClientException,

-                      "consumerGroup can not equal DEFAULT_CONSUMER", -1);

-  }


-  if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {

-    THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);

-  }


-  if (m_pMessageListener == NULL) {

-    THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);

-  }



-void DefaultMQPushConsumer::copySubscription() {

-  map<string, string>::iterator it = m_subTopics.begin();

-  for (; it != m_subTopics.end(); ++it) {

-    LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),

-             it->second.c_str());

-    unique_ptr<SubscriptionData> pSData(

-        FilterAPI::buildSubscriptionData(it->first, it->second));


-    m_pRebalance->setSubscriptionData(it->first, pSData.release());

-  }


-  switch (getMessageModel()) {


-      break;

-    case CLUSTERING: {

-      string retryTopic = UtilAll::getRetryTopic(getGroupName());


-      //<!this sub;

-      unique_ptr<SubscriptionData> pSData(

-          FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));


-      m_pRebalance->setSubscriptionData(retryTopic, pSData.release());

-      break;

-    }

-    default:

-      break;

-  }



-void DefaultMQPushConsumer::updateTopicSubscribeInfo(

-    const string& topic, vector<MQMessageQueue>& info) {

-  m_pRebalance->setTopicSubscribeInfo(topic, info);



-void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {

-  map<string, SubscriptionData*>& subTable =

-      m_pRebalance->getSubscriptionInner();

-  map<string, SubscriptionData*>::iterator it = subTable.begin();

-  for (; it != subTable.end(); ++it) {

-    bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(

-        it->first, getSessionCredentials());

-    if (btopic == false) {

-      LOG_WARN("The topic:[%s] not exist", it->first.c_str());

-    }

-  }



-ConsumeType DefaultMQPushConsumer::getConsumeType() {




-ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {

-  return m_consumeFromWhere;



-void DefaultMQPushConsumer::setConsumeFromWhere(

-    ConsumeFromWhere consumeFromWhere) {

-  m_consumeFromWhere = consumeFromWhere;



-void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData>& result) {

-  map<string, SubscriptionData*>& subTable =

-      m_pRebalance->getSubscriptionInner();

-  map<string, SubscriptionData*>::iterator it = subTable.begin();

-  for (; it != subTable.end(); ++it) {

-    result.push_back(*(it->second));

-  }



-void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue& mq,

-                                                int64 offset) {

-  if (offset >= 0) {

-    m_pOffsetStore->updateOffset(mq, offset);

-  } else {

-    LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());

-  }


-void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) {

-  m_pOffsetStore->removeOffset(mq);



-void DefaultMQPushConsumer::triggerNextPullRequest(

-    boost::asio::deadline_timer* t, PullRequest* request) {

-  // LOG_INFO("trigger pullrequest for:%s",

-  // (request->m_messageQueue).toString().c_str());

-  producePullMsgTask(request);

-  deleteAndZero(t);



-void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) {

-  if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {

-    request->addPullMsgEvent();

-    if (m_asyncPull) {

-      m_pullmsgQueue->produce(TaskBinder::gen(

-          &DefaultMQPushConsumer::pullMessageAsync, this, request));

-    } else {

-      m_pullmsgQueue->produce(

-          TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));

-    }

-  } else {

-    LOG_WARN("produce pullmsg of mq:%s failed",

-             request->m_messageQueue.toString().c_str());

-  }



-void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) {

-  pTaskQueue->run();



-void DefaultMQPushConsumer::pullMessage(PullRequest* request) {

-  if (request == NULL || request->isDroped()) {

-    LOG_WARN("Pull request is set drop with mq:%s, return",

-             (request->m_messageQueue).toString().c_str());

-    request->removePullMsgEvent();

-    return;

-  }


-  MQMessageQueue& messageQueue = request->m_messageQueue;

-  if (m_consumerService->getConsumeMsgSerivceListenerType() ==

-      messageListenerOrderly) {

-    if (!request->isLocked() || request->isLockExpired()) {

-      if (!m_pRebalance->lock(messageQueue)) {

-        producePullMsgTask(request);

-        return;

-      }

-    }

-  }


-  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {

-    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger

-    // than:%d",  (request->m_messageQueue).toString().c_str(),

-    // request->getCacheMsgCount(), m_maxMsgCacheSize);

-    boost::asio::deadline_timer* t = new boost::asio::deadline_timer(

-        m_async_ioService, boost::posix_time::milliseconds(1 * 1000));

-    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,

-                              this, t, request));

-    return;

-  }


-  bool commitOffsetEnable = false;

-  int64 commitOffsetValue = 0;

-  if (CLUSTERING == getMessageModel()) {

-    commitOffsetValue = m_pOffsetStore->readOffset(

-        messageQueue, READ_FROM_MEMORY, getSessionCredentials());

-    if (commitOffsetValue > 0) {

-      commitOffsetEnable = true;

-    }

-  }


-  string subExpression;

-  SubscriptionData* pSdata =

-      m_pRebalance->getSubscriptionData(messageQueue.getTopic());

-  if (pSdata == NULL) {

-    producePullMsgTask(request);

-    return;

-  }

-  subExpression = pSdata->getSubString();


-  int sysFlag =

-      PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset

-                                false,                   // suspend

-                                !subExpression.empty(),  // subscription

-                                false);                  // class filter


-  try {

-    request->setLastPullTimestamp(UtilAll::currentTimeMillis());

-    unique_ptr<PullResult> result(

-        m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1

-                                          subExpression,             // 2

-                                          pSdata->getSubVersion(),   // 3

-                                          request->getNextOffset(),  // 4

-                                          32,                        // 5

-                                          sysFlag,                   // 6

-                                          commitOffsetValue,         // 7

-                                          1000 * 15,                 // 8

-                                          1000 * 30,                 // 9

-                                          ComMode_SYNC,              // 10

-                                          NULL, getSessionCredentials()));


-    PullResult pullResult = m_pPullAPIWrapper->processPullResult(

-        messageQueue, result.get(), pSdata);


-    switch (pullResult.pullStatus) {

-      case FOUND: {

-        if (!request->isDroped())  // if request is setted to dropped, don't add

-                                   // msgFoundList to m_msgTreeMap and don't

-                                   // call producePullMsgTask

-        {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently

-           // and this request is dropped, and then received pulled msgs.

-          request->setNextOffset(pullResult.nextBeginOffset);

-          request->putMessage(pullResult.msgFoundList);


-          m_consumerService->submitConsumeRequest(request,

-                                                  pullResult.msgFoundList);

-          producePullMsgTask(request);


-          LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld",

-                    messageQueue.toString().c_str(),

-                    pullResult.msgFoundList.size(), pullResult.nextBeginOffset);

-        } else {

-          request->removePullMsgEvent();

-        }

-        break;

-      }

-      case NO_NEW_MSG: {

-        request->setNextOffset(pullResult.nextBeginOffset);

-        vector<MQMessageExt> msgs;

-        request->getMessage(msgs);

-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {

-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset

-          is kept, then consumer will enter following situation:

-          1>. get pull offset with 0 when do rebalance, and set

-          m_offsetTable[mq] to 0;

-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin

-          offset increase by 800

-          3>. request->getMessage(msgs) always NULL

-          4>. we need update consumerOffset to nextBeginOffset indicated by

-          broker

-          but if really no new msg could be pulled, also go to this CASE

-       */

-          // LOG_DEBUG("maybe misMatch between broker and client happens, update

-          // consumerOffset to nextBeginOffset indicated by broker");

-          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);

-        }

-        producePullMsgTask(request);

-        LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",

-                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);

-        break;

-      }

-      case NO_MATCHED_MSG: {

-        request->setNextOffset(pullResult.nextBeginOffset);

-        vector<MQMessageExt> msgs;

-        request->getMessage(msgs);

-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {

-          // LOG_DEBUG("maybe misMatch between broker and client happens, update

-          // consumerOffset to nextBeginOffset indicated by broker");

-          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);

-        }

-        producePullMsgTask(request);


-        LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",

-                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);

-        break;

-      }

-      case OFFSET_ILLEGAL: {

-        request->setNextOffset(pullResult.nextBeginOffset);

-        producePullMsgTask(request);


-        LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",

-                  messageQueue.toString().c_str(), pullResult.nextBeginOffset);

-        break;

-      }

-      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker

-                              // will not returns this status, so this case

-                              // could not be entered.

-        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");

-        request->setNextOffset(pullResult.nextBeginOffset);

-        producePullMsgTask(request);

-        break;

-      }

-    }

-  } catch (MQException& e) {

-    LOG_ERROR(e.what());

-    producePullMsgTask(request);

-  }



-AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(

-    PullRequest* request, MQMessageQueue msgQueue) {

-  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);

-  if (m_asyncPull && request) {

-    PullMAP::iterator it = m_PullCallback.find(msgQueue);

-    if (it == m_PullCallback.end()) {

-      LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());

-      m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);

-    }

-    return m_PullCallback[msgQueue];

-  }


-  return NULL;



-void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {

-  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);

-  if (m_asyncPull) {

-    PullMAP::iterator it = m_PullCallback.begin();

-    for (; it != m_PullCallback.end(); ++it) {

-      if (it->second) {

-        it->second->setShutdownStatus();

-      } else {

-        LOG_ERROR("could not find asyncPullCallback for:%s",

-                  it->first.toString().c_str());

-      }

-    }

-  }



-void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {

-  if (request == NULL || request->isDroped()) {

-    LOG_WARN("Pull request is set drop with mq:%s, return",

-             (request->m_messageQueue).toString().c_str());

-    request->removePullMsgEvent();

-    return;

-  }


-  MQMessageQueue& messageQueue = request->m_messageQueue;

-  if (m_consumerService->getConsumeMsgSerivceListenerType() ==

-      messageListenerOrderly) {

-    if (!request->isLocked() || request->isLockExpired()) {

-      if (!m_pRebalance->lock(messageQueue)) {

-        producePullMsgTask(request);

-        return;

-      }

-    }

-  }


-  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {

-    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger

-    // than:%d",  (request->m_messageQueue).toString().c_str(),

-    // request->getCacheMsgCount(), m_maxMsgCacheSize);

-    boost::asio::deadline_timer* t = new boost::asio::deadline_timer(

-        m_async_ioService, boost::posix_time::milliseconds(1 * 1000));

-    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,

-                              this, t, request));

-    return;

-  }


-  bool commitOffsetEnable = false;

-  int64 commitOffsetValue = 0;

-  if (CLUSTERING == getMessageModel()) {

-    commitOffsetValue = m_pOffsetStore->readOffset(

-        messageQueue, READ_FROM_MEMORY, getSessionCredentials());

-    if (commitOffsetValue > 0) {

-      commitOffsetEnable = true;

-    }

-  }


-  string subExpression;

-  SubscriptionData* pSdata =

-      (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));

-  if (pSdata == NULL) {

-    producePullMsgTask(request);

-    return;

-  }

-  subExpression = pSdata->getSubString();


-  int sysFlag =

-      PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset

-                                true,                    // suspend

-                                !subExpression.empty(),  // subscription

-                                false);                  // class filter


-  AsyncArg arg;

- = messageQueue;

-  arg.subData = *pSdata;

-  arg.pPullWrapper = m_pPullAPIWrapper;


-  try {

-    request->setLastPullTimestamp(UtilAll::currentTimeMillis());

-    m_pPullAPIWrapper->pullKernelImpl(

-        messageQueue,                                 // 1

-        subExpression,                                // 2

-        pSdata->getSubVersion(),                      // 3

-        request->getNextOffset(),                     // 4

-        32,                                           // 5

-        sysFlag,                                      // 6

-        commitOffsetValue,                            // 7

-        1000 * 15,                                    // 8

-        m_asyncPullTimeout,                           // 9

-        ComMode_ASYNC,                                // 10

-        getAsyncPullCallBack(request, messageQueue),  // 11

-        getSessionCredentials(),                      // 12

-        &arg);                                        // 13

-  } catch (MQException& e) {

-    LOG_ERROR(e.what());

-    producePullMsgTask(request);

-  }



-void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {

-  if (asyncFlag) {

-    LOG_INFO("set pushConsumer:%s to async default pull mode",

-             getGroupName().c_str());

-  } else {

-    LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());

-  }

-  m_asyncPull = asyncFlag;



-void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {

-  if (threadCount > 0) {

-    m_consumeThreadCount = threadCount;

-  } else {

-    LOG_ERROR("setConsumeThreadCount with invalid value");

-  }



-int DefaultMQPushConsumer::getConsumeThreadCount() const {

-  return m_consumeThreadCount;



-void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {

-  m_pullMsgThreadPoolNum = threadCount;



-int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {

-  return m_pullMsgThreadPoolNum;



-int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {

-  return m_consumeMessageBatchMaxSize;



-void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(

-    int consumeMessageBatchMaxSize) {

-  if (consumeMessageBatchMaxSize >= 1)

-    m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;



-void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {

-  if (maxCacheSize > 0 && maxCacheSize < 65535) {

-    LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,

-             getGroupName().c_str());

-    m_maxMsgCacheSize = maxCacheSize;

-  }



-int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {

-  return m_maxMsgCacheSize;



-ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {

-  ConsumerRunningInfo* info = new ConsumerRunningInfo();

-  if (info) {

-    if (m_consumerService->getConsumeMsgSerivceListenerType() ==

-        messageListenerOrderly)

-      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");

-    else

-      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");

-    info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,

-                      UtilAll::to_string(m_consumeThreadCount));

-    info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,

-                      UtilAll::to_string(m_startTime));


-    vector<SubscriptionData> result;

-    getSubscriptions(result);

-    info->setSubscriptionSet(result);


-    map<MQMessageQueue, PullRequest*> requestTable =

-        m_pRebalance->getPullRequestTable();

-    map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();


-    for (; it != requestTable.end(); ++it) {

-      if (!it->second->isDroped()) {

-        map<MessageQueue, ProcessQueueInfo> queueTable;

-        MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),

-                           (it->first).getQueueId());

-        ProcessQueueInfo processQueue;

-        processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();

-        processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();

-        processQueue.cachedMsgCount = it->second->getCacheMsgCount();

-        processQueue.setCommitOffset(m_pOffsetStore->readOffset(

-            it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));

-        processQueue.setDroped(it->second->isDroped());

-        processQueue.setLocked(it->second->isLocked());

-        processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();

-        processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();

-        processQueue.lastConsumeTimestamp =

-            it->second->getLastConsumeTimestamp();

-        info->setMqTable(queue, processQueue);

-      }

-    }


-    return info;

-  }

-  return NULL;




-}  //<!end namespace;

+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DefaultMQPushConsumer.h"
+#include "CommunicationMode.h"
+#include "ConsumeMsgService.h"
+#include "ConsumerRunningInfo.h"
+#include "FilterAPI.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQProtos.h"
+#include "OffsetStore.h"
+#include "PullAPIWrapper.h"
+#include "PullSysFlag.h"
+#include "Rebalance.h"
+#include "UtilAll.h"
+#include "Validators.h"
+#include "task_queue.h"
+namespace rocketmq {
+    class AsyncPullCallback : public PullCallback {
+    public:
+        AsyncPullCallback(DefaultMQPushConsumer *pushConsumer, PullRequest *request)
+                : m_callbackOwner(pushConsumer),
+                  m_pullRequest(request),
+                  m_bShutdown(false) {}
+        virtual ~AsyncPullCallback() {
+            m_callbackOwner = NULL;
+            m_pullRequest = NULL;
+        }
+        virtual void onSuccess(MQMessageQueue &mq, PullResult &result,
+                               bool bProducePullRequest) {
+            if (m_bShutdown == true) {
+                LOG_INFO("pullrequest for:%s in shutdown, return",
+                         (m_pullRequest->m_messageQueue).toString().c_str());
+                m_pullRequest->removePullMsgEvent();
+                return;
+            }
+            switch (result.pullStatus) {
+                case FOUND: {
+                    if (!m_pullRequest->isDroped())  // if request is setted to dropped,
+                        // don't add msgFoundList to
+                        // m_msgTreeMap and don't call
+                        // producePullMsgTask
+                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+                        // and this request is dropped, and then received pulled msgs.
+                        m_pullRequest->setNextOffset(result.nextBeginOffset);
+                        m_pullRequest->putMessage(result.msgFoundList);
+                        m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
+                                m_pullRequest, result.msgFoundList);
+                        if (bProducePullRequest)
+                            m_callbackOwner->producePullMsgTask(m_pullRequest);
+                        else
+                            m_pullRequest->removePullMsgEvent();
+                        LOG_DEBUG("FOUND:%s with size:"
+                                          SIZET_FMT
+                                          ", nextBeginOffset:%lld",
+                                  (m_pullRequest->m_messageQueue).toString().c_str(),
+                                  result.msgFoundList.size(), result.nextBeginOffset);
+                    } else {
+                        LOG_INFO("remove pullmsg event of mq:%s",
+                                 (m_pullRequest->m_messageQueue).toString().c_str());
+                        m_pullRequest->removePullMsgEvent();
+                    }
+                    break;
+                }
+                case NO_NEW_MSG: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    m_pullRequest->getMessage(msgs);
+                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+                        LOG_INFO("maybe misMatch between broker and client happens, update
+                        consumerOffset to nextBeginOffset indicated by broker");*/
+                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                                             result.nextBeginOffset);
+                    }
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case NO_MATCHED_MSG: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    m_pullRequest->getMessage(msgs);
+                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+                        LOG_INFO("maybe misMatch between broker and client happens, update
+                        consumerOffset to nextBeginOffset indicated by broker");*/
+                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                                             result.nextBeginOffset);
+                    }
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case OFFSET_ILLEGAL: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                    // will not returns this status, so this case
+                    // could not be entered.
+                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    break;
+                }
+            }
+        }
+        virtual void onException(MQException &e) {
+            if (m_bShutdown == true) {
+                LOG_INFO("pullrequest for:%s in shutdown, return",
+                         (m_pullRequest->m_messageQueue).toString().c_str());
+                m_pullRequest->removePullMsgEvent();
+                return;
+            }
+            LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
+                     (m_pullRequest->m_messageQueue).toString().c_str());
+            m_callbackOwner->producePullMsgTask(m_pullRequest);
+        }
+        void setShutdownStatus() { m_bShutdown = true; }
+    private:
+        DefaultMQPushConsumer *m_callbackOwner;
+        PullRequest *m_pullRequest;
+        bool m_bShutdown;
+    };
+    static boost::mutex m_asyncCallbackLock;
+    DefaultMQPushConsumer::DefaultMQPushConsumer(const string &groupname)
+            : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
+              m_pOffsetStore(NULL),
+              m_pPullAPIWrapper(NULL),
+              m_pMessageListener(NULL),
+              m_consumeMessageBatchMaxSize(1),
+              m_maxMsgCacheSize(1000) {
+        //<!set default group name;
+        string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
+        setGroupName(gname);
+        m_asyncPull = true;
+        m_asyncPullTimeout = 30 * 1000;
+        setMessageModel(CLUSTERING);
+        m_startTime = UtilAll::currentTimeMillis();
+        m_consumeThreadCount = boost::thread::hardware_concurrency();
+        m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
+        m_async_service_thread.reset(new boost::thread(
+                boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
+    }
+    void DefaultMQPushConsumer::boost_asio_work() {
+        LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
+        boost::asio::io_service::work work(m_async_ioService);  // avoid async io
+        // service stops after
+        // first timer timeout
+        // callback
+    }
+    DefaultMQPushConsumer::~DefaultMQPushConsumer() {
+        m_pMessageListener = NULL;
+        if (m_pullmsgQueue != NULL) {
+            deleteAndZero(m_pullmsgQueue);
+        }
+        if (m_pRebalance != NULL) {
+            deleteAndZero(m_pRebalance);
+        }
+        if (m_pOffsetStore != NULL) {
+            deleteAndZero(m_pOffsetStore);
+        }
+        if (m_pPullAPIWrapper != NULL) {
+            deleteAndZero(m_pPullAPIWrapper);
+        }
+        if (m_consumerService != NULL) {
+            deleteAndZero(m_consumerService);
+        }
+        PullMAP::iterator it = m_PullCallback.begin();
+        for (; it != m_PullCallback.end(); ++it) {
+            deleteAndZero(it->second);
+        }
+        m_PullCallback.clear();
+        m_subTopics.clear();
+    }
+    void DefaultMQPushConsumer::sendMessageBack(MQMessageExt &msg, int delayLevel) {
+        try {
+            getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
+                    msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+        }
+    }
+    void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
+            const string &topic, vector<MQMessageQueue> &mqs) {
+        mqs.clear();
+        try {
+            getFactory()->fetchSubscribeMessageQueues(topic, mqs,
+                                                      getSessionCredentials());
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+        }
+    }
+    void DefaultMQPushConsumer::doRebalance() {
+        if (isServiceStateOk()) {
+            try {
+                m_pRebalance->doRebalance();
+            } catch (MQException &e) {
+                LOG_ERROR(e.what());
+            }
+        }
+    }
+    void DefaultMQPushConsumer::persistConsumerOffset() {
+        if (isServiceStateOk()) {
+            m_pRebalance->persistConsumerOffset();
+        }
+    }
+    void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
+        if (isServiceStateOk()) {
+            m_pRebalance->persistConsumerOffsetByResetOffset();
+        }
+    }
+    void DefaultMQPushConsumer::start() {
+#ifndef WIN32
+        /* Ignore the SIGPIPE */
+        struct sigaction sa;
+        memset(&sa,0, sizeof(struct sigaction));
+        sa.sa_handler = SIG_IGN;
+        sa.sa_flags = 0;
+        sigaction(SIGPIPE, &sa, 0);
+        switch (m_serviceState) {
+            case CREATE_JUST: {
+                m_serviceState = START_FAILED;
+                MQClient::start();
+                LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
+                //<!data;
+                checkConfig();
+                //<!create rebalance;
+                m_pRebalance = new RebalancePush(this, getFactory());
+                string groupname = getGroupName();
+                m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
+                if (m_pMessageListener) {
+                    if (m_pMessageListener->getMessageListenerType() ==
+                        messageListenerOrderly) {
+                        LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
+                        m_consumerService = new ConsumeMessageOrderlyService(
+                                this, m_consumeThreadCount, m_pMessageListener);
+                    } else  // for backward compatible, defaultly and concurrently listeners
+                        // are allocating ConsumeMessageConcurrentlyService
+                    {
+                        LOG_INFO("start concurrently consume service:%s",
+                                 getGroupName().c_str());
+                        m_consumerService = new ConsumeMessageConcurrentlyService(
+                                this, m_consumeThreadCount, m_pMessageListener);
+                    }
+                }
+                m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
+                m_pullmsgThread.reset(new boost::thread(boost::bind(
+                        &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
+                copySubscription();
+                //<! registe;
+                bool registerOK = getFactory()->registerConsumer(this);
+                if (!registerOK) {
+                    m_serviceState = CREATE_JUST;
+                    THROW_MQEXCEPTION(
+                            MQClientException,
+                            "The cousumer group[" + getGroupName() +
+                            "] has been created before, specify another name please.",
+                            -1);
+                }
+                //<!msg model;
+                switch (getMessageModel()) {
+                    case BROADCASTING:
+                        m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
+                        break;
+                    case CLUSTERING:
+                        m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
+                        break;
+                }
+                bool bStartFailed = false;
+                string errorMsg;
+                try {
+                    m_pOffsetStore->load();
+                } catch (MQClientException &e) {
+                    bStartFailed = true;
+                    errorMsg = std::string(e.what());
+                }
+                m_consumerService->start();
+                getFactory()->start();
+                updateTopicSubscribeInfoWhenSubscriptionChanged();
+                getFactory()->sendHeartbeatToAllBroker();
+                m_serviceState = RUNNING;
+                if (bStartFailed) {
+                    shutdown();
+                    THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
+                }
+                break;
+            }
+            case RUNNING:
+            case START_FAILED:
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+        getFactory()->rebalanceImmediately();
+    }
+    void DefaultMQPushConsumer::shutdown() {
+        switch (m_serviceState) {
+            case RUNNING: {
+                LOG_INFO("DefaultMQPushConsumer shutdown");
+                m_async_ioService.stop();
+                m_async_service_thread->interrupt();
+                m_async_service_thread->join();
+                m_pullmsgQueue->close();
+                m_pullmsgThread->interrupt();
+                m_pullmsgThread->join();
+                m_consumerService->shutdown();
+                persistConsumerOffset();
+                shutdownAsyncPullCallBack();  // delete aync pullMsg resources
+                getFactory()->unregisterConsumer(this);
+                getFactory()->shutdown();
+                m_serviceState = SHUTDOWN_ALREADY;
+                break;
+            }
+            case CREATE_JUST:
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+    void DefaultMQPushConsumer::registerMessageListener(
+            MQMessageListener *pMessageListener) {
+        if (NULL != pMessageListener) {
+            m_pMessageListener = pMessageListener;
+        }
+    }
+    MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
+        if (NULL != m_pMessageListener) {
+            return m_pMessageListener->getMessageListenerType();
+        }
+        return messageListenerDefaultly;
+    }
+    ConsumeMsgService *DefaultMQPushConsumer::getConsumerMsgService() const {
+        return m_consumerService;
+    }
+    OffsetStore *DefaultMQPushConsumer::getOffsetStore() const {
+        return m_pOffsetStore;
+    }
+    Rebalance *DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
+    void DefaultMQPushConsumer::subscribe(const string &topic,
+                                          const string &subExpression) {
+        m_subTopics[topic] = subExpression;
+    }
+    void DefaultMQPushConsumer::checkConfig() {
+        string groupname = getGroupName();
+        // check consumerGroup
+        Validators::checkGroup(groupname);
+        // consumerGroup
+        if (! {
+            THROW_MQEXCEPTION(MQClientException,
+                              "consumerGroup can not equal DEFAULT_CONSUMER", -1);
+        }
+        if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
+            THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
+        }
+        if (m_pMessageListener == NULL) {
+            THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
+        }
+    }
+    void DefaultMQPushConsumer::copySubscription() {
+        map<string, string>::iterator it = m_subTopics.begin();
+        for (; it != m_subTopics.end(); ++it) {
+            LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
+                     it->second.c_str());
+            unique_ptr<SubscriptionData> pSData(
+                    FilterAPI::buildSubscriptionData(it->first, it->second));
+            m_pRebalance->setSubscriptionData(it->first, pSData.release());
+        }
+        switch (getMessageModel()) {
+            case BROADCASTING:
+                break;
+            case CLUSTERING: {
+                string retryTopic = UtilAll::getRetryTopic(getGroupName());
+                //<!this sub;
+                unique_ptr<SubscriptionData> pSData(
+                        FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
+                m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
+                break;
+            }
+            default:
+                break;
+        }
+    }
+    void DefaultMQPushConsumer::updateTopicSubscribeInfo(
+            const string &topic, vector<MQMessageQueue> &info) {
+        m_pRebalance->setTopicSubscribeInfo(topic, info);
+    }
+    void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        map<string, SubscriptionData *> &subTable =
+                m_pRebalance->getSubscriptionInner();
+        map<string, SubscriptionData *>::iterator it = subTable.begin();
+        for (; it != subTable.end(); ++it) {
+            bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
+                    it->first, getSessionCredentials());
+            if (btopic == false) {
+                LOG_WARN("The topic:[%s] not exist", it->first.c_str());
+            }
+        }
+    }
+    ConsumeType DefaultMQPushConsumer::getConsumeType() {
+        return CONSUME_PASSIVELY;
+    }
+    ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
+        return m_consumeFromWhere;
+    }
+    void DefaultMQPushConsumer::setConsumeFromWhere(
+            ConsumeFromWhere consumeFromWhere) {
+        m_consumeFromWhere = consumeFromWhere;
+    }
+    void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData> &result) {
+        map<string, SubscriptionData *> &subTable =
+                m_pRebalance->getSubscriptionInner();
+        map<string, SubscriptionData *>::iterator it = subTable.begin();
+        for (; it != subTable.end(); ++it) {
+            result.push_back(*(it->second));
+        }
+    }
+    void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue &mq,
+                                                    int64 offset) {
+        if (offset >= 0) {
+            m_pOffsetStore->updateOffset(mq, offset);
+        } else {
+            LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
+        }
+    }
+    void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue &mq) {
+        m_pOffsetStore->removeOffset(mq);
+    }
+    void DefaultMQPushConsumer::triggerNextPullRequest(
+            boost::asio::deadline_timer *t, PullRequest *request) {
+        // LOG_INFO("trigger pullrequest for:%s",
+        // (request->m_messageQueue).toString().c_str());
+        producePullMsgTask(request);
+        deleteAndZero(t);
+    }
+    void DefaultMQPushConsumer::producePullMsgTask(PullRequest *request) {
+        if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+            request->addPullMsgEvent();
+            if (m_asyncPull) {
+                m_pullmsgQueue->produce(TaskBinder::gen(
+                        &DefaultMQPushConsumer::pullMessageAsync, this, request));
+            } else {
+                m_pullmsgQueue->produce(
+                        TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
+            }
+        } else {
+            LOG_WARN("produce pullmsg of mq:%s failed",
+                     request->m_messageQueue.toString().c_str());
+        }
+    }
+    void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue *pTaskQueue) {
+        pTaskQueue->run();
+    }
+    void DefaultMQPushConsumer::pullMessage(PullRequest *request) {
+        if (request == NULL) {
+            LOG_ERROR("Pull request is NULL, return");
+            return;
+        }
+        if (request->isDroped()) {
+            LOG_WARN("Pull request is set drop with mq:%s, return",
+                     (request->m_messageQueue).toString().c_str());
+            request->removePullMsgEvent();
+            return;
+        }
+        MQMessageQueue &messageQueue = request->m_messageQueue;
+        if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+            messageListenerOrderly) {
+            if (!request->isLocked() || request->isLockExpired()) {
+                if (!m_pRebalance->lock(messageQueue)) {
+                    producePullMsgTask(request);
+                    return;
+                }
+            }
+        }
+        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+            // than:%d",  (request->m_messageQueue).toString().c_str(),
+            // request->getCacheMsgCount(), m_maxMsgCacheSize);
+            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                                      this, t, request));
+            return;
+        }
+        bool commitOffsetEnable = false;
+        int64 commitOffsetValue = 0;
+        if (CLUSTERING == getMessageModel()) {
+            commitOffsetValue = m_pOffsetStore->readOffset(
+                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+            if (commitOffsetValue > 0) {
+                commitOffsetEnable = true;
+            }
+        }
+        string subExpression;
+        SubscriptionData *pSdata =
+                m_pRebalance->getSubscriptionData(messageQueue.getTopic());
+        if (pSdata == NULL) {
+            producePullMsgTask(request);
+            return;
+        }
+        subExpression = pSdata->getSubString();
+        int sysFlag =
+                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                          false,                   // suspend
+                                          !subExpression.empty(),  // subscription
+                                          false);                  // class filter
+        try {
+            request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+            unique_ptr<PullResult> result(
+                    m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
+                                                      subExpression,             // 2
+                                                      pSdata->getSubVersion(),   // 3
+                                                      request->getNextOffset(),  // 4
+                                                      32,                        // 5
+                                                      sysFlag,                   // 6
+                                                      commitOffsetValue,         // 7
+                                                      1000 * 15,                 // 8
+                                                      1000 * 30,                 // 9
+                                                      ComMode_SYNC,              // 10
+                                                      NULL, getSessionCredentials()));
+            PullResult pullResult = m_pPullAPIWrapper->processPullResult(
+                    messageQueue, result.get(), pSdata);
+            switch (pullResult.pullStatus) {
+                case FOUND: {
+                    if (!request->isDroped())  // if request is setted to dropped, don't add
+                        // msgFoundList to m_msgTreeMap and don't
+                        // call producePullMsgTask
+                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+                        // and this request is dropped, and then received pulled msgs.
+                        request->setNextOffset(pullResult.nextBeginOffset);
+                        request->putMessage(pullResult.msgFoundList);
+                        m_consumerService->submitConsumeRequest(request,
+                                                                pullResult.msgFoundList);
+                        producePullMsgTask(request);
+                        LOG_DEBUG("FOUND:%s with size:"
+                                          SIZET_FMT
+                                          ",nextBeginOffset:%lld",
+                                  messageQueue.toString().c_str(),
+                                  pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+                    } else {
+                        request->removePullMsgEvent();
+                    }
+                    break;
+                }
+                case NO_NEW_MSG: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    request->getMessage(msgs);
+                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+                     */
+                        // LOG_DEBUG("maybe misMatch between broker and client happens, update
+                        // consumerOffset to nextBeginOffset indicated by broker");
+                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+                    }
+                    producePullMsgTask(request);
+                    LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case NO_MATCHED_MSG: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    request->getMessage(msgs);
+                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+                        // LOG_DEBUG("maybe misMatch between broker and client happens, update
+                        // consumerOffset to nextBeginOffset indicated by broker");
+                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+                    }
+                    producePullMsgTask(request);
+                    LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case OFFSET_ILLEGAL: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    producePullMsgTask(request);
+                    LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                    // will not returns this status, so this case
+                    // could not be entered.
+                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    producePullMsgTask(request);
+                    break;
+                }
+            }
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+            producePullMsgTask(request);
+        }
+    }
+    AsyncPullCallback *DefaultMQPushConsumer::getAsyncPullCallBack(
+            PullRequest *request, MQMessageQueue msgQueue) {
+        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+        if (m_asyncPull && request) {
+            PullMAP::iterator it = m_PullCallback.find(msgQueue);
+            if (it == m_PullCallback.end()) {
+                LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
+                m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
+            }
+            return m_PullCallback[msgQueue];
+        }
+        return NULL;
+    }
+    void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
+        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+        if (m_asyncPull) {
+            PullMAP::iterator it = m_PullCallback.begin();
+            for (; it != m_PullCallback.end(); ++it) {
+                if (it->second) {
+                    it->second->setShutdownStatus();
+                } else {
+                    LOG_ERROR("could not find asyncPullCallback for:%s",
+                              it->first.toString().c_str());
+                }
+            }
+        }
+    }
+    void DefaultMQPushConsumer::pullMessageAsync(PullRequest *request) {
+        if (request == NULL) {
+            LOG_ERROR("Pull request is NULL, return");
+            return;
+        }
+        if (request->isDroped()) {
+            LOG_WARN("Pull request is set drop with mq:%s, return",
+                     (request->m_messageQueue).toString().c_str());
+            request->removePullMsgEvent();
+            return;
+        }
+        MQMessageQueue &messageQueue = request->m_messageQueue;
+        if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+            messageListenerOrderly) {
+            if (!request->isLocked() || request->isLockExpired()) {
+                if (!m_pRebalance->lock(messageQueue)) {
+                    producePullMsgTask(request);
+                    return;
+                }
+            }
+        }
+        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+            // than:%d",  (request->m_messageQueue).toString().c_str(),
+            // request->getCacheMsgCount(), m_maxMsgCacheSize);
+            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                                      this, t, request));
+            return;
+        }
+        bool commitOffsetEnable = false;
+        int64 commitOffsetValue = 0;
+        if (CLUSTERING == getMessageModel()) {
+            commitOffsetValue = m_pOffsetStore->readOffset(
+                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+            if (commitOffsetValue > 0) {
+                commitOffsetEnable = true;
+            }
+        }
+        string subExpression;
+        SubscriptionData *pSdata =
+                (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
+        if (pSdata == NULL) {
+            producePullMsgTask(request);
+            return;
+        }
+        subExpression = pSdata->getSubString();
+        int sysFlag =
+                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                          true,                    // suspend
+                                          !subExpression.empty(),  // subscription
+                                          false);                  // class filter
+        AsyncArg arg;
+ = messageQueue;
+        arg.subData = *pSdata;
+        arg.pPullWrapper = m_pPullAPIWrapper;
+        try {
+            request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+            m_pPullAPIWrapper->pullKernelImpl(
+                    messageQueue,                                 // 1
+                    subExpression,                                // 2
+                    pSdata->getSubVersion(),                      // 3
+                    request->getNextOffset(),                     // 4
+                    32,                                           // 5
+                    sysFlag,                                      // 6
+                    commitOffsetValue,                            // 7
+                    1000 * 15,                                    // 8
+                    m_asyncPullTimeout,                           // 9
+                    ComMode_ASYNC,                                // 10
+                    getAsyncPullCallBack(request, messageQueue),  // 11
+                    getSessionCredentials(),                      // 12
+                    &arg);                                        // 13
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+            producePullMsgTask(request);
+        }
+    }
+    void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
+        if (asyncFlag) {
+            LOG_INFO("set pushConsumer:%s to async default pull mode",
+                     getGroupName().c_str());
+        } else {
+            LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
+        }
+        m_asyncPull = asyncFlag;
+    }
+    void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
+        if (threadCount > 0) {
+            m_consumeThreadCount = threadCount;
+        } else {
+            LOG_ERROR("setConsumeThreadCount with invalid value");
+        }
+    }
+    int DefaultMQPushConsumer::getConsumeThreadCount() const {
+        return m_consumeThreadCount;
+    }
+    void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
+        m_pullMsgThreadPoolNum = threadCount;
+    }
+    int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
+        return m_pullMsgThreadPoolNum;
+    }
+    int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
+        return m_consumeMessageBatchMaxSize;
+    }
+    void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
+            int consumeMessageBatchMaxSize) {
+        if (consumeMessageBatchMaxSize >= 1)
+            m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+    }
+    void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
+        if (maxCacheSize > 0 && maxCacheSize < 65535) {
+            LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
+                     getGroupName().c_str());
+            m_maxMsgCacheSize = maxCacheSize;
+        }
+    }
+    int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
+        return m_maxMsgCacheSize;
+    }
+    ConsumerRunningInfo *DefaultMQPushConsumer::getConsumerRunningInfo() {
+        ConsumerRunningInfo *info = new ConsumerRunningInfo();
+        if (info) {
+            if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+                messageListenerOrderly)
+                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+            else
+                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
+            info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
+                              UtilAll::to_string(m_consumeThreadCount));
+            info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
+                              UtilAll::to_string(m_startTime));
+            vector<SubscriptionData> result;
+            getSubscriptions(result);
+            info->setSubscriptionSet(result);
+            map<MQMessageQueue, PullRequest *> requestTable =
+                    m_pRebalance->getPullRequestTable();
+            map<MQMessageQueue, PullRequest *>::iterator it = requestTable.begin();
+            for (; it != requestTable.end(); ++it) {
+                if (!it->second->isDroped()) {
+                    map<MessageQueue, ProcessQueueInfo> queueTable;
+                    MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
+                                       (it->first).getQueueId());
+                    ProcessQueueInfo processQueue;
+                    processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
+                    processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
+                    processQueue.cachedMsgCount = it->second->getCacheMsgCount();
+                    processQueue.setCommitOffset(m_pOffsetStore->readOffset(
+                            it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+                    processQueue.setDroped(it->second->isDroped());
+                    processQueue.setLocked(it->second->isLocked());
+                    processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
+                    processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
+                    processQueue.lastConsumeTimestamp =
+                            it->second->getLastConsumeTimestamp();
+                    info->setMqTable(queue, processQueue);
+                }
+            }
+            return info;
+        }
+        return NULL;
+    }
+}  //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index c62bd7d..f7abfaf 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -85,7 +85,7 @@
   map<int64, MQMessageExt> m_msgTreeMapTemp;
   boost::mutex m_pullRequestLock;
   uint64 m_lastLockTimestamp;  // ms
-  uint64 m_tryUnlockTimes;
+  //uint64 m_tryUnlockTimes;
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
   boost::timed_mutex m_consumeLock;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 748ffca..4edf811 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -37,7 +37,7 @@
     : m_sendMsgTimeout(3000),
       m_compressMsgBodyOverHowmuch(4 * 1024),
       m_maxMessageSize(1024 * 128),
-      m_retryAnotherBrokerWhenNotStoreOK(false),
+      //m_retryAnotherBrokerWhenNotStoreOK(false),
       m_retryTimes4Async(1) {
@@ -51,7 +51,8 @@
 void DefaultMQProducer::start() {
 #ifndef WIN32
   /* Ignore the SIGPIPE */
-  struct sigaction sa = {0};
+  struct sigaction sa;
+  memset(&sa,0, sizeof(struct sigaction));
   sa.sa_handler = SIG_IGN;
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
diff --git a/src/thread/disruptor/sequence.h b/src/thread/disruptor/sequence.h
index b3e5876..4fb61a4 100755
--- a/src/thread/disruptor/sequence.h
+++ b/src/thread/disruptor/sequence.h
@@ -102,9 +102,9 @@
     PaddedSequence(int64_t initial_value = kInitialCursorValue) :
             Sequence(initial_value) {}
- private:
+ //private:
     // padding
-    int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
+    //int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
@@ -133,8 +133,8 @@
      PaddedLong(int64_t initial_value = kInitialCursorValue) :
          MutableLong(initial_value) {}
- private:
-     int64_t padding_[SEQUENCE_PADDING_LENGTH];
+ //private:
+     //int64_t padding_[SEQUENCE_PADDING_LENGTH];
 int64_t GetMinimumSequence(
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 62cc19e..011c420 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -30,12 +30,12 @@
 TcpTransport::TcpTransport(TcpRemotingClient *pTcpRemointClient,
                            READ_CALLBACK handle /* = NULL */)
     : m_tcpConnectStatus(e_connectInit),
-      m_ReadDatathread(NULL),
-      m_readcallback(handle),
-      m_tcpRemotingClient(pTcpRemointClient),
-      m_event_base_cv() {
+      m_event_base_cv(),
+      m_ReadDatathread(NULL),
+      m_readcallback(handle),
+      m_tcpRemotingClient(pTcpRemointClient){
   m_startTime = UtilAll::currentTimeMillis();
 #ifdef WIN32