Fix build warnings
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3a21ab4..e9d5ca2 100755
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -139,6 +139,8 @@
  -fPIC
  -fno-strict-aliasing
  -std=c++11
+ -Wno-unused-local-typedef
+ -Wno-expansion-to-defined
  # -finline-limit=1000
  # -Wextra
  # -pedantic
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 9a8f41f..b3ba2b2 100755
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -97,7 +97,7 @@
   int m_sendMsgTimeout;

   int m_compressMsgBodyOverHowmuch;

   int m_maxMessageSize;  //<! default:128K;

-  bool m_retryAnotherBrokerWhenNotStoreOK;

+  //bool m_retryAnotherBrokerWhenNotStoreOK;

   int m_compressLevel;

   int m_retryTimes;

 };

diff --git a/src/common/sync_http_client.h b/src/common/sync_http_client.h
index b25cc77..9836605 100755
--- a/src/common/sync_http_client.h
+++ b/src/common/sync_http_client.h
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 #ifndef ROCKETMQ_CLIENT4CPP__SYNC_HTTP_CLIENT_H_
-#define ROCKETMQ_CLIENT4CPP_SYNC_HTTP_CLIENT_H_
+#define ROCKETMQ_CLIENT4CPP__SYNC_HTTP_CLIENT_H_
 
 #include <string>
 
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 4aa33f6..073a801 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -58,7 +58,8 @@
 void DefaultMQPullConsumer::start() {
 #ifndef WIN32
   /* Ignore the SIGPIPE */
-  struct sigaction sa = {0};
+  struct sigaction sa;
+  memset(&sa,0, sizeof(struct sigaction));
   sa.sa_handler = SIG_IGN;
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 8963691..159e0ff 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -1,962 +1,963 @@
-/*

- * Licensed to the Apache Software Foundation (ASF) under one or more

- * contributor license agreements.  See the NOTICE file distributed with

- * this work for additional information regarding copyright ownership.

- * The ASF licenses this file to You under the Apache License, Version 2.0

- * (the "License"); you may not use this file except in compliance with

- * the License.  You may obtain a copy of the License at

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-#include "DefaultMQPushConsumer.h"

-#include "CommunicationMode.h"

-#include "ConsumeMsgService.h"

-#include "ConsumerRunningInfo.h"

-#include "FilterAPI.h"

-#include "Logging.h"

-#include "MQClientAPIImpl.h"

-#include "MQClientFactory.h"

-#include "MQClientManager.h"

-#include "MQProtos.h"

-#include "OffsetStore.h"

-#include "PullAPIWrapper.h"

-#include "PullSysFlag.h"

-#include "Rebalance.h"

-#include "UtilAll.h"

-#include "Validators.h"

-#include "task_queue.h"

-

-namespace rocketmq {

-

-    class AsyncPullCallback : public PullCallback {

-    public:

-        AsyncPullCallback(DefaultMQPushConsumer *pushConsumer, PullRequest *request)

-                : m_callbackOwner(pushConsumer),

-                  m_pullRequest(request),

-                  m_bShutdown(false) {}

-

-        virtual ~AsyncPullCallback() {

-            m_callbackOwner = NULL;

-            m_pullRequest = NULL;

-        }

-

-        virtual void onSuccess(MQMessageQueue &mq, PullResult &result,

-                               bool bProducePullRequest) {

-            if (m_bShutdown == true) {

-                LOG_INFO("pullrequest for:%s in shutdown, return",

-                         (m_pullRequest->m_messageQueue).toString().c_str());

-                m_pullRequest->removePullMsgEvent();

-                return;

-            }

-

-            switch (result.pullStatus) {

-                case FOUND: {

-                    if (!m_pullRequest->isDroped())  // if request is setted to dropped,

-                        // don't add msgFoundList to

-                        // m_msgTreeMap and don't call

-                        // producePullMsgTask

-                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently

-                        // and this request is dropped, and then received pulled msgs.

-                        m_pullRequest->setNextOffset(result.nextBeginOffset);

-                        m_pullRequest->putMessage(result.msgFoundList);

-

-                        m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(

-                                m_pullRequest, result.msgFoundList);

-

-                        if (bProducePullRequest)

-                            m_callbackOwner->producePullMsgTask(m_pullRequest);

-                        else

-                            m_pullRequest->removePullMsgEvent();

-

-                        LOG_DEBUG("FOUND:%s with size:"

-                                          SIZET_FMT

-                                          ", nextBeginOffset:%lld",

-                                  (m_pullRequest->m_messageQueue).toString().c_str(),

-                                  result.msgFoundList.size(), result.nextBeginOffset);

-                    } else {

-                        LOG_INFO("remove pullmsg event of mq:%s",

-                                 (m_pullRequest->m_messageQueue).toString().c_str());

-                        m_pullRequest->removePullMsgEvent();

-                    }

-                    break;

-                }

-                case NO_NEW_MSG: {

-                    m_pullRequest->setNextOffset(result.nextBeginOffset);

-

-                    vector<MQMessageExt> msgs;

-                    m_pullRequest->getMessage(msgs);

-                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {

-                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset

-                        is kept, then consumer will enter following situation:

-                        1>. get pull offset with 0 when do rebalance, and set

-                        m_offsetTable[mq] to 0;

-                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin

-                        offset increase by 800

-                        3>. request->getMessage(msgs) always NULL

-                        4>. we need update consumerOffset to nextBeginOffset indicated by

-                        broker

-                        but if really no new msg could be pulled, also go to this CASE

-

-                        LOG_INFO("maybe misMatch between broker and client happens, update

-                        consumerOffset to nextBeginOffset indicated by broker");*/

-                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,

-                                                             result.nextBeginOffset);

-                    }

-                    if (bProducePullRequest)

-                        m_callbackOwner->producePullMsgTask(m_pullRequest);

-                    else

-                        m_pullRequest->removePullMsgEvent();

-

-                    /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",

-                             (m_pullRequest->m_messageQueue).toString().c_str(),

-                             result.nextBeginOffset);*/

-                    break;

-                }

-                case NO_MATCHED_MSG: {

-                    m_pullRequest->setNextOffset(result.nextBeginOffset);

-

-                    vector<MQMessageExt> msgs;

-                    m_pullRequest->getMessage(msgs);

-                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {

-                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset

-                        is kept, then consumer will enter following situation:

-                        1>. get pull offset with 0 when do rebalance, and set

-                        m_offsetTable[mq] to 0;

-                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin

-                        offset increase by 800

-                        3>. request->getMessage(msgs) always NULL

-                        4>. we need update consumerOffset to nextBeginOffset indicated by

-                        broker

-                        but if really no new msg could be pulled, also go to this CASE

-

-                        LOG_INFO("maybe misMatch between broker and client happens, update

-                        consumerOffset to nextBeginOffset indicated by broker");*/

-                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,

-                                                             result.nextBeginOffset);

-                    }

-                    if (bProducePullRequest)

-                        m_callbackOwner->producePullMsgTask(m_pullRequest);

-                    else

-                        m_pullRequest->removePullMsgEvent();

-                    /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",

-                             (m_pullRequest->m_messageQueue).toString().c_str(),

-                             result.nextBeginOffset);*/

-                    break;

-                }

-                case OFFSET_ILLEGAL: {

-                    m_pullRequest->setNextOffset(result.nextBeginOffset);

-                    if (bProducePullRequest)

-                        m_callbackOwner->producePullMsgTask(m_pullRequest);

-                    else

-                        m_pullRequest->removePullMsgEvent();

-

-                    /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",

-                             (m_pullRequest->m_messageQueue).toString().c_str(),

-                             result.nextBeginOffset);*/

-                    break;

-                }

-                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker

-                    // will not returns this status, so this case

-                    // could not be entered.

-                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");

-                    m_pullRequest->setNextOffset(result.nextBeginOffset);

-                    if (bProducePullRequest)

-                        m_callbackOwner->producePullMsgTask(m_pullRequest);

-                    else

-                        m_pullRequest->removePullMsgEvent();

-                    break;

-                }

-            }

-        }

-

-        virtual void onException(MQException &e) {

-            if (m_bShutdown == true) {

-                LOG_INFO("pullrequest for:%s in shutdown, return",

-                         (m_pullRequest->m_messageQueue).toString().c_str());

-                m_pullRequest->removePullMsgEvent();

-                return;

-            }

-            LOG_WARN("pullrequest for:%s occurs exception, reproduce it",

-                     (m_pullRequest->m_messageQueue).toString().c_str());

-            m_callbackOwner->producePullMsgTask(m_pullRequest);

-        }

-

-        void setShutdownStatus() { m_bShutdown = true; }

-

-    private:

-        DefaultMQPushConsumer *m_callbackOwner;

-        PullRequest *m_pullRequest;

-        bool m_bShutdown;

-    };

-

-//<!***************************************************************************

-    static boost::mutex m_asyncCallbackLock;

-

-    DefaultMQPushConsumer::DefaultMQPushConsumer(const string &groupname)

-            : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),

-              m_pOffsetStore(NULL),

-              m_pPullAPIWrapper(NULL),

-              m_pMessageListener(NULL),

-              m_consumeMessageBatchMaxSize(1),

-              m_maxMsgCacheSize(1000) {

-        //<!set default group name;

-        string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;

-        setGroupName(gname);

-        m_asyncPull = true;

-        m_asyncPullTimeout = 30 * 1000;

-        setMessageModel(CLUSTERING);

-

-        m_startTime = UtilAll::currentTimeMillis();

-        m_consumeThreadCount = boost::thread::hardware_concurrency();

-        m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();

-        m_async_service_thread.reset(new boost::thread(

-                boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));

-    }

-

-    void DefaultMQPushConsumer::boost_asio_work() {

-        LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");

-        boost::asio::io_service::work work(m_async_ioService);  // avoid async io

-        // service stops after

-        // first timer timeout

-        // callback

-        m_async_ioService.run();

-    }

-

-    DefaultMQPushConsumer::~DefaultMQPushConsumer() {

-        m_pMessageListener = NULL;

-        if (m_pullmsgQueue != NULL) {

-            deleteAndZero(m_pullmsgQueue);

-        }

-        if (m_pRebalance != NULL) {

-            deleteAndZero(m_pRebalance);

-        }

-        if (m_pOffsetStore != NULL) {

-            deleteAndZero(m_pOffsetStore);

-        }

-        if (m_pPullAPIWrapper != NULL) {

-            deleteAndZero(m_pPullAPIWrapper);

-        }

-        if (m_consumerService != NULL) {

-            deleteAndZero(m_consumerService);

-        }

-        PullMAP::iterator it = m_PullCallback.begin();

-        for (; it != m_PullCallback.end(); ++it) {

-            deleteAndZero(it->second);

-        }

-        m_PullCallback.clear();

-        m_subTopics.clear();

-    }

-

-    void DefaultMQPushConsumer::sendMessageBack(MQMessageExt &msg, int delayLevel) {

-        try {

-            getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(

-                    msg, getGroupName(), delayLevel, 3000, getSessionCredentials());

-        } catch (MQException &e) {

-            LOG_ERROR(e.what());

-        }

-    }

-

-    void DefaultMQPushConsumer::fetchSubscribeMessageQueues(

-            const string &topic, vector<MQMessageQueue> &mqs) {

-        mqs.clear();

-        try {

-            getFactory()->fetchSubscribeMessageQueues(topic, mqs,

-                                                      getSessionCredentials());

-        } catch (MQException &e) {

-            LOG_ERROR(e.what());

-        }

-    }

-

-    void DefaultMQPushConsumer::doRebalance() {

-        if (isServiceStateOk()) {

-            try {

-                m_pRebalance->doRebalance();

-            } catch (MQException &e) {

-                LOG_ERROR(e.what());

-            }

-        }

-    }

-

-    void DefaultMQPushConsumer::persistConsumerOffset() {

-        if (isServiceStateOk()) {

-            m_pRebalance->persistConsumerOffset();

-        }

-    }

-

-    void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {

-        if (isServiceStateOk()) {

-            m_pRebalance->persistConsumerOffsetByResetOffset();

-        }

-    }

-

-    void DefaultMQPushConsumer::start() {

-#ifndef WIN32

-        /* Ignore the SIGPIPE */

-        struct sigaction sa = {0};

-        sa.sa_handler = SIG_IGN;

-        sa.sa_flags = 0;

-        sigaction(SIGPIPE, &sa, 0);

-#endif

-        switch (m_serviceState) {

-            case CREATE_JUST: {

-                m_serviceState = START_FAILED;

-                MQClient::start();

-                LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());

-

-                //<!data;

-                checkConfig();

-

-                //<!create rebalance;

-                m_pRebalance = new RebalancePush(this, getFactory());

-

-                string groupname = getGroupName();

-                m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);

-

-                if (m_pMessageListener) {

-                    if (m_pMessageListener->getMessageListenerType() ==

-                        messageListenerOrderly) {

-                        LOG_INFO("start orderly consume service:%s", getGroupName().c_str());

-                        m_consumerService = new ConsumeMessageOrderlyService(

-                                this, m_consumeThreadCount, m_pMessageListener);

-                    } else  // for backward compatible, defaultly and concurrently listeners

-                        // are allocating ConsumeMessageConcurrentlyService

-                    {

-                        LOG_INFO("start concurrently consume service:%s",

-                                 getGroupName().c_str());

-                        m_consumerService = new ConsumeMessageConcurrentlyService(

-                                this, m_consumeThreadCount, m_pMessageListener);

-                    }

-                }

-

-                m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);

-                m_pullmsgThread.reset(new boost::thread(boost::bind(

-                        &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));

-

-                copySubscription();

-

-                //<! registe;

-                bool registerOK = getFactory()->registerConsumer(this);

-                if (!registerOK) {

-                    m_serviceState = CREATE_JUST;

-                    THROW_MQEXCEPTION(

-                            MQClientException,

-                            "The cousumer group[" + getGroupName() +

-                            "] has been created before, specify another name please.",

-                            -1);

-                }

-

-                //<!msg model;

-                switch (getMessageModel()) {

-                    case BROADCASTING:

-                        m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());

-                        break;

-                    case CLUSTERING:

-                        m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());

-                        break;

-                }

-                bool bStartFailed = false;

-                string errorMsg;

-                try {

-                    m_pOffsetStore->load();

-                } catch (MQClientException &e) {

-                    bStartFailed = true;

-                    errorMsg = std::string(e.what());

-                }

-                m_consumerService->start();

-

-                getFactory()->start();

-

-                updateTopicSubscribeInfoWhenSubscriptionChanged();

-                getFactory()->sendHeartbeatToAllBroker();

-

-                m_serviceState = RUNNING;

-                if (bStartFailed) {

-                    shutdown();

-                    THROW_MQEXCEPTION(MQClientException, errorMsg, -1);

-                }

-                break;

-            }

-            case RUNNING:

-            case START_FAILED:

-            case SHUTDOWN_ALREADY:

-                break;

-            default:

-                break;

-        }

-

-        getFactory()->rebalanceImmediately();

-    }

-

-    void DefaultMQPushConsumer::shutdown() {

-        switch (m_serviceState) {

-            case RUNNING: {

-                LOG_INFO("DefaultMQPushConsumer shutdown");

-                m_async_ioService.stop();

-                m_async_service_thread->interrupt();

-                m_async_service_thread->join();

-                m_pullmsgQueue->close();

-                m_pullmsgThread->interrupt();

-                m_pullmsgThread->join();

-                m_consumerService->shutdown();

-                persistConsumerOffset();

-                shutdownAsyncPullCallBack();  // delete aync pullMsg resources

-                getFactory()->unregisterConsumer(this);

-                getFactory()->shutdown();

-                m_serviceState = SHUTDOWN_ALREADY;

-                break;

-            }

-            case CREATE_JUST:

-            case SHUTDOWN_ALREADY:

-                break;

-            default:

-                break;

-        }

-    }

-

-    void DefaultMQPushConsumer::registerMessageListener(

-            MQMessageListener *pMessageListener) {

-        if (NULL != pMessageListener) {

-            m_pMessageListener = pMessageListener;

-        }

-    }

-

-    MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {

-        if (NULL != m_pMessageListener) {

-            return m_pMessageListener->getMessageListenerType();

-        }

-        return messageListenerDefaultly;

-    }

-

-    ConsumeMsgService *DefaultMQPushConsumer::getConsumerMsgService() const {

-        return m_consumerService;

-    }

-

-    OffsetStore *DefaultMQPushConsumer::getOffsetStore() const {

-        return m_pOffsetStore;

-    }

-

-    Rebalance *DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }

-

-    void DefaultMQPushConsumer::subscribe(const string &topic,

-                                          const string &subExpression) {

-        m_subTopics[topic] = subExpression;

-    }

-

-    void DefaultMQPushConsumer::checkConfig() {

-        string groupname = getGroupName();

-        // check consumerGroup

-        Validators::checkGroup(groupname);

-

-        // consumerGroup

-        if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {

-            THROW_MQEXCEPTION(MQClientException,

-                              "consumerGroup can not equal DEFAULT_CONSUMER", -1);

-        }

-

-        if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {

-            THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);

-        }

-

-        if (m_pMessageListener == NULL) {

-            THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);

-        }

-    }

-

-    void DefaultMQPushConsumer::copySubscription() {

-        map<string, string>::iterator it = m_subTopics.begin();

-        for (; it != m_subTopics.end(); ++it) {

-            LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),

-                     it->second.c_str());

-            unique_ptr<SubscriptionData> pSData(

-                    FilterAPI::buildSubscriptionData(it->first, it->second));

-

-            m_pRebalance->setSubscriptionData(it->first, pSData.release());

-        }

-

-        switch (getMessageModel()) {

-            case BROADCASTING:

-                break;

-            case CLUSTERING: {

-                string retryTopic = UtilAll::getRetryTopic(getGroupName());

-

-                //<!this sub;

-                unique_ptr<SubscriptionData> pSData(

-                        FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));

-

-                m_pRebalance->setSubscriptionData(retryTopic, pSData.release());

-                break;

-            }

-            default:

-                break;

-        }

-    }

-

-    void DefaultMQPushConsumer::updateTopicSubscribeInfo(

-            const string &topic, vector<MQMessageQueue> &info) {

-        m_pRebalance->setTopicSubscribeInfo(topic, info);

-    }

-

-    void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {

-        map<string, SubscriptionData *> &subTable =

-                m_pRebalance->getSubscriptionInner();

-        map<string, SubscriptionData *>::iterator it = subTable.begin();

-        for (; it != subTable.end(); ++it) {

-            bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(

-                    it->first, getSessionCredentials());

-            if (btopic == false) {

-                LOG_WARN("The topic:[%s] not exist", it->first.c_str());

-            }

-        }

-    }

-

-    ConsumeType DefaultMQPushConsumer::getConsumeType() {

-        return CONSUME_PASSIVELY;

-    }

-

-    ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {

-        return m_consumeFromWhere;

-    }

-

-    void DefaultMQPushConsumer::setConsumeFromWhere(

-            ConsumeFromWhere consumeFromWhere) {

-        m_consumeFromWhere = consumeFromWhere;

-    }

-

-    void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData> &result) {

-        map<string, SubscriptionData *> &subTable =

-                m_pRebalance->getSubscriptionInner();

-        map<string, SubscriptionData *>::iterator it = subTable.begin();

-        for (; it != subTable.end(); ++it) {

-            result.push_back(*(it->second));

-        }

-    }

-

-    void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue &mq,

-                                                    int64 offset) {

-        if (offset >= 0) {

-            m_pOffsetStore->updateOffset(mq, offset);

-        } else {

-            LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());

-        }

-    }

-

-    void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue &mq) {

-        m_pOffsetStore->removeOffset(mq);

-    }

-

-    void DefaultMQPushConsumer::triggerNextPullRequest(

-            boost::asio::deadline_timer *t, PullRequest *request) {

-        // LOG_INFO("trigger pullrequest for:%s",

-        // (request->m_messageQueue).toString().c_str());

-        producePullMsgTask(request);

-        deleteAndZero(t);

-    }

-

-    void DefaultMQPushConsumer::producePullMsgTask(PullRequest *request) {

-        if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {

-            request->addPullMsgEvent();

-            if (m_asyncPull) {

-                m_pullmsgQueue->produce(TaskBinder::gen(

-                        &DefaultMQPushConsumer::pullMessageAsync, this, request));

-            } else {

-                m_pullmsgQueue->produce(

-                        TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));

-            }

-        } else {

-            LOG_WARN("produce pullmsg of mq:%s failed",

-                     request->m_messageQueue.toString().c_str());

-        }

-    }

-

-    void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue *pTaskQueue) {

-        pTaskQueue->run();

-    }

-

-    void DefaultMQPushConsumer::pullMessage(PullRequest *request) {

-        if (request == NULL) {

-            LOG_ERROR("Pull request is NULL, return");

-            return;

-        }

-        if (request->isDroped()) {

-            LOG_WARN("Pull request is set drop with mq:%s, return",

-                     (request->m_messageQueue).toString().c_str());

-            request->removePullMsgEvent();

-            return;

-        }

-

-        MQMessageQueue &messageQueue = request->m_messageQueue;

-        if (m_consumerService->getConsumeMsgSerivceListenerType() ==

-            messageListenerOrderly) {

-            if (!request->isLocked() || request->isLockExpired()) {

-                if (!m_pRebalance->lock(messageQueue)) {

-                    producePullMsgTask(request);

-                    return;

-                }

-            }

-        }

-

-        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {

-            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger

-            // than:%d",  (request->m_messageQueue).toString().c_str(),

-            // request->getCacheMsgCount(), m_maxMsgCacheSize);

-            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(

-                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));

-            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,

-                                      this, t, request));

-            return;

-        }

-

-        bool commitOffsetEnable = false;

-        int64 commitOffsetValue = 0;

-        if (CLUSTERING == getMessageModel()) {

-            commitOffsetValue = m_pOffsetStore->readOffset(

-                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());

-            if (commitOffsetValue > 0) {

-                commitOffsetEnable = true;

-            }

-        }

-

-        string subExpression;

-        SubscriptionData *pSdata =

-                m_pRebalance->getSubscriptionData(messageQueue.getTopic());

-        if (pSdata == NULL) {

-            producePullMsgTask(request);

-            return;

-        }

-        subExpression = pSdata->getSubString();

-

-        int sysFlag =

-                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset

-                                          false,                   // suspend

-                                          !subExpression.empty(),  // subscription

-                                          false);                  // class filter

-

-        try {

-            request->setLastPullTimestamp(UtilAll::currentTimeMillis());

-            unique_ptr<PullResult> result(

-                    m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1

-                                                      subExpression,             // 2

-                                                      pSdata->getSubVersion(),   // 3

-                                                      request->getNextOffset(),  // 4

-                                                      32,                        // 5

-                                                      sysFlag,                   // 6

-                                                      commitOffsetValue,         // 7

-                                                      1000 * 15,                 // 8

-                                                      1000 * 30,                 // 9

-                                                      ComMode_SYNC,              // 10

-                                                      NULL, getSessionCredentials()));

-

-            PullResult pullResult = m_pPullAPIWrapper->processPullResult(

-                    messageQueue, result.get(), pSdata);

-

-            switch (pullResult.pullStatus) {

-                case FOUND: {

-                    if (!request->isDroped())  // if request is setted to dropped, don't add

-                        // msgFoundList to m_msgTreeMap and don't

-                        // call producePullMsgTask

-                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently

-                        // and this request is dropped, and then received pulled msgs.

-                        request->setNextOffset(pullResult.nextBeginOffset);

-                        request->putMessage(pullResult.msgFoundList);

-

-                        m_consumerService->submitConsumeRequest(request,

-                                                                pullResult.msgFoundList);

-                        producePullMsgTask(request);

-

-                        LOG_DEBUG("FOUND:%s with size:"

-                                          SIZET_FMT

-                                          ",nextBeginOffset:%lld",

-                                  messageQueue.toString().c_str(),

-                                  pullResult.msgFoundList.size(), pullResult.nextBeginOffset);

-                    } else {

-                        request->removePullMsgEvent();

-                    }

-                    break;

-                }

-                case NO_NEW_MSG: {

-                    request->setNextOffset(pullResult.nextBeginOffset);

-                    vector<MQMessageExt> msgs;

-                    request->getMessage(msgs);

-                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {

-                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset

-                        is kept, then consumer will enter following situation:

-                        1>. get pull offset with 0 when do rebalance, and set

-                        m_offsetTable[mq] to 0;

-                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin

-                        offset increase by 800

-                        3>. request->getMessage(msgs) always NULL

-                        4>. we need update consumerOffset to nextBeginOffset indicated by

-                        broker

-                        but if really no new msg could be pulled, also go to this CASE

-                     */

-                        // LOG_DEBUG("maybe misMatch between broker and client happens, update

-                        // consumerOffset to nextBeginOffset indicated by broker");

-                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);

-                    }

-                    producePullMsgTask(request);

-                    LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",

-                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);

-                    break;

-                }

-                case NO_MATCHED_MSG: {

-                    request->setNextOffset(pullResult.nextBeginOffset);

-                    vector<MQMessageExt> msgs;

-                    request->getMessage(msgs);

-                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {

-                        // LOG_DEBUG("maybe misMatch between broker and client happens, update

-                        // consumerOffset to nextBeginOffset indicated by broker");

-                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);

-                    }

-                    producePullMsgTask(request);

-

-                    LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",

-                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);

-                    break;

-                }

-                case OFFSET_ILLEGAL: {

-                    request->setNextOffset(pullResult.nextBeginOffset);

-                    producePullMsgTask(request);

-

-                    LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",

-                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);

-                    break;

-                }

-                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker

-                    // will not returns this status, so this case

-                    // could not be entered.

-                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");

-                    request->setNextOffset(pullResult.nextBeginOffset);

-                    producePullMsgTask(request);

-                    break;

-                }

-            }

-        } catch (MQException &e) {

-            LOG_ERROR(e.what());

-            producePullMsgTask(request);

-        }

-    }

-

-    AsyncPullCallback *DefaultMQPushConsumer::getAsyncPullCallBack(

-            PullRequest *request, MQMessageQueue msgQueue) {

-        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);

-        if (m_asyncPull && request) {

-            PullMAP::iterator it = m_PullCallback.find(msgQueue);

-            if (it == m_PullCallback.end()) {

-                LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());

-                m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);

-            }

-            return m_PullCallback[msgQueue];

-        }

-

-        return NULL;

-    }

-

-    void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {

-        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);

-        if (m_asyncPull) {

-            PullMAP::iterator it = m_PullCallback.begin();

-            for (; it != m_PullCallback.end(); ++it) {

-                if (it->second) {

-                    it->second->setShutdownStatus();

-                } else {

-                    LOG_ERROR("could not find asyncPullCallback for:%s",

-                              it->first.toString().c_str());

-                }

-            }

-        }

-    }

-

-    void DefaultMQPushConsumer::pullMessageAsync(PullRequest *request) {

-        if (request == NULL) {

-            LOG_ERROR("Pull request is NULL, return");

-            return;

-        }

-        if (request->isDroped()) {

-            LOG_WARN("Pull request is set drop with mq:%s, return",

-                     (request->m_messageQueue).toString().c_str());

-            request->removePullMsgEvent();

-            return;

-        }

-

-        MQMessageQueue &messageQueue = request->m_messageQueue;

-        if (m_consumerService->getConsumeMsgSerivceListenerType() ==

-            messageListenerOrderly) {

-            if (!request->isLocked() || request->isLockExpired()) {

-                if (!m_pRebalance->lock(messageQueue)) {

-                    producePullMsgTask(request);

-                    return;

-                }

-            }

-        }

-

-        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {

-            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger

-            // than:%d",  (request->m_messageQueue).toString().c_str(),

-            // request->getCacheMsgCount(), m_maxMsgCacheSize);

-            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(

-                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));

-            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,

-                                      this, t, request));

-            return;

-        }

-

-        bool commitOffsetEnable = false;

-        int64 commitOffsetValue = 0;

-        if (CLUSTERING == getMessageModel()) {

-            commitOffsetValue = m_pOffsetStore->readOffset(

-                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());

-            if (commitOffsetValue > 0) {

-                commitOffsetEnable = true;

-            }

-        }

-

-        string subExpression;

-        SubscriptionData *pSdata =

-                (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));

-        if (pSdata == NULL) {

-            producePullMsgTask(request);

-            return;

-        }

-        subExpression = pSdata->getSubString();

-

-        int sysFlag =

-                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset

-                                          true,                    // suspend

-                                          !subExpression.empty(),  // subscription

-                                          false);                  // class filter

-

-        AsyncArg arg;

-        arg.mq = messageQueue;

-        arg.subData = *pSdata;

-        arg.pPullWrapper = m_pPullAPIWrapper;

-

-        try {

-            request->setLastPullTimestamp(UtilAll::currentTimeMillis());

-            m_pPullAPIWrapper->pullKernelImpl(

-                    messageQueue,                                 // 1

-                    subExpression,                                // 2

-                    pSdata->getSubVersion(),                      // 3

-                    request->getNextOffset(),                     // 4

-                    32,                                           // 5

-                    sysFlag,                                      // 6

-                    commitOffsetValue,                            // 7

-                    1000 * 15,                                    // 8

-                    m_asyncPullTimeout,                           // 9

-                    ComMode_ASYNC,                                // 10

-                    getAsyncPullCallBack(request, messageQueue),  // 11

-                    getSessionCredentials(),                      // 12

-                    &arg);                                        // 13

-        } catch (MQException &e) {

-            LOG_ERROR(e.what());

-            producePullMsgTask(request);

-        }

-    }

-

-    void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {

-        if (asyncFlag) {

-            LOG_INFO("set pushConsumer:%s to async default pull mode",

-                     getGroupName().c_str());

-        } else {

-            LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());

-        }

-        m_asyncPull = asyncFlag;

-    }

-

-    void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {

-        if (threadCount > 0) {

-            m_consumeThreadCount = threadCount;

-        } else {

-            LOG_ERROR("setConsumeThreadCount with invalid value");

-        }

-    }

-

-    int DefaultMQPushConsumer::getConsumeThreadCount() const {

-        return m_consumeThreadCount;

-    }

-

-    void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {

-        m_pullMsgThreadPoolNum = threadCount;

-    }

-

-    int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {

-        return m_pullMsgThreadPoolNum;

-    }

-

-    int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {

-        return m_consumeMessageBatchMaxSize;

-    }

-

-    void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(

-            int consumeMessageBatchMaxSize) {

-        if (consumeMessageBatchMaxSize >= 1)

-            m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;

-    }

-

-    void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {

-        if (maxCacheSize > 0 && maxCacheSize < 65535) {

-            LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,

-                     getGroupName().c_str());

-            m_maxMsgCacheSize = maxCacheSize;

-        }

-    }

-

-    int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {

-        return m_maxMsgCacheSize;

-    }

-

-    ConsumerRunningInfo *DefaultMQPushConsumer::getConsumerRunningInfo() {

-        ConsumerRunningInfo *info = new ConsumerRunningInfo();

-        if (info) {

-            if (m_consumerService->getConsumeMsgSerivceListenerType() ==

-                messageListenerOrderly)

-                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");

-            else

-                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");

-            info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,

-                              UtilAll::to_string(m_consumeThreadCount));

-            info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,

-                              UtilAll::to_string(m_startTime));

-

-            vector<SubscriptionData> result;

-            getSubscriptions(result);

-            info->setSubscriptionSet(result);

-

-            map<MQMessageQueue, PullRequest *> requestTable =

-                    m_pRebalance->getPullRequestTable();

-            map<MQMessageQueue, PullRequest *>::iterator it = requestTable.begin();

-

-            for (; it != requestTable.end(); ++it) {

-                if (!it->second->isDroped()) {

-                    map<MessageQueue, ProcessQueueInfo> queueTable;

-                    MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),

-                                       (it->first).getQueueId());

-                    ProcessQueueInfo processQueue;

-                    processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();

-                    processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();

-                    processQueue.cachedMsgCount = it->second->getCacheMsgCount();

-                    processQueue.setCommitOffset(m_pOffsetStore->readOffset(

-                            it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));

-                    processQueue.setDroped(it->second->isDroped());

-                    processQueue.setLocked(it->second->isLocked());

-                    processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();

-                    processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();

-                    processQueue.lastConsumeTimestamp =

-                            it->second->getLastConsumeTimestamp();

-                    info->setMqTable(queue, processQueue);

-                }

-            }

-

-            return info;

-        }

-        return NULL;

-    }

-

-//<!************************************************************************

-}  //<!end namespace;

+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQPushConsumer.h"
+#include "CommunicationMode.h"
+#include "ConsumeMsgService.h"
+#include "ConsumerRunningInfo.h"
+#include "FilterAPI.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQProtos.h"
+#include "OffsetStore.h"
+#include "PullAPIWrapper.h"
+#include "PullSysFlag.h"
+#include "Rebalance.h"
+#include "UtilAll.h"
+#include "Validators.h"
+#include "task_queue.h"
+
+namespace rocketmq {
+
+    class AsyncPullCallback : public PullCallback {
+    public:
+        AsyncPullCallback(DefaultMQPushConsumer *pushConsumer, PullRequest *request)
+                : m_callbackOwner(pushConsumer),
+                  m_pullRequest(request),
+                  m_bShutdown(false) {}
+
+        virtual ~AsyncPullCallback() {
+            m_callbackOwner = NULL;
+            m_pullRequest = NULL;
+        }
+
+        virtual void onSuccess(MQMessageQueue &mq, PullResult &result,
+                               bool bProducePullRequest) {
+            if (m_bShutdown == true) {
+                LOG_INFO("pullrequest for:%s in shutdown, return",
+                         (m_pullRequest->m_messageQueue).toString().c_str());
+                m_pullRequest->removePullMsgEvent();
+                return;
+            }
+
+            switch (result.pullStatus) {
+                case FOUND: {
+                    if (!m_pullRequest->isDroped())  // if request is setted to dropped,
+                        // don't add msgFoundList to
+                        // m_msgTreeMap and don't call
+                        // producePullMsgTask
+                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+                        // and this request is dropped, and then received pulled msgs.
+                        m_pullRequest->setNextOffset(result.nextBeginOffset);
+                        m_pullRequest->putMessage(result.msgFoundList);
+
+                        m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
+                                m_pullRequest, result.msgFoundList);
+
+                        if (bProducePullRequest)
+                            m_callbackOwner->producePullMsgTask(m_pullRequest);
+                        else
+                            m_pullRequest->removePullMsgEvent();
+
+                        LOG_DEBUG("FOUND:%s with size:"
+                                          SIZET_FMT
+                                          ", nextBeginOffset:%lld",
+                                  (m_pullRequest->m_messageQueue).toString().c_str(),
+                                  result.msgFoundList.size(), result.nextBeginOffset);
+                    } else {
+                        LOG_INFO("remove pullmsg event of mq:%s",
+                                 (m_pullRequest->m_messageQueue).toString().c_str());
+                        m_pullRequest->removePullMsgEvent();
+                    }
+                    break;
+                }
+                case NO_NEW_MSG: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+                    vector<MQMessageExt> msgs;
+                    m_pullRequest->getMessage(msgs);
+                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+
+                        LOG_INFO("maybe misMatch between broker and client happens, update
+                        consumerOffset to nextBeginOffset indicated by broker");*/
+                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                                             result.nextBeginOffset);
+                    }
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+
+                    /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case NO_MATCHED_MSG: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+                    vector<MQMessageExt> msgs;
+                    m_pullRequest->getMessage(msgs);
+                    if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+
+                        LOG_INFO("maybe misMatch between broker and client happens, update
+                        consumerOffset to nextBeginOffset indicated by broker");*/
+                        m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+                                                             result.nextBeginOffset);
+                    }
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case OFFSET_ILLEGAL: {
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+
+                    /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                             (m_pullRequest->m_messageQueue).toString().c_str(),
+                             result.nextBeginOffset);*/
+                    break;
+                }
+                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                    // will not returns this status, so this case
+                    // could not be entered.
+                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+                    m_pullRequest->setNextOffset(result.nextBeginOffset);
+                    if (bProducePullRequest)
+                        m_callbackOwner->producePullMsgTask(m_pullRequest);
+                    else
+                        m_pullRequest->removePullMsgEvent();
+                    break;
+                }
+            }
+        }
+
+        virtual void onException(MQException &e) {
+            if (m_bShutdown == true) {
+                LOG_INFO("pullrequest for:%s in shutdown, return",
+                         (m_pullRequest->m_messageQueue).toString().c_str());
+                m_pullRequest->removePullMsgEvent();
+                return;
+            }
+            LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
+                     (m_pullRequest->m_messageQueue).toString().c_str());
+            m_callbackOwner->producePullMsgTask(m_pullRequest);
+        }
+
+        void setShutdownStatus() { m_bShutdown = true; }
+
+    private:
+        DefaultMQPushConsumer *m_callbackOwner;
+        PullRequest *m_pullRequest;
+        bool m_bShutdown;
+    };
+
+//<!***************************************************************************
+    static boost::mutex m_asyncCallbackLock;
+
+    DefaultMQPushConsumer::DefaultMQPushConsumer(const string &groupname)
+            : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
+              m_pOffsetStore(NULL),
+              m_pPullAPIWrapper(NULL),
+              m_pMessageListener(NULL),
+              m_consumeMessageBatchMaxSize(1),
+              m_maxMsgCacheSize(1000) {
+        //<!set default group name;
+        string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
+        setGroupName(gname);
+        m_asyncPull = true;
+        m_asyncPullTimeout = 30 * 1000;
+        setMessageModel(CLUSTERING);
+
+        m_startTime = UtilAll::currentTimeMillis();
+        m_consumeThreadCount = boost::thread::hardware_concurrency();
+        m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
+        m_async_service_thread.reset(new boost::thread(
+                boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
+    }
+
+    void DefaultMQPushConsumer::boost_asio_work() {
+        LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
+        boost::asio::io_service::work work(m_async_ioService);  // avoid async io
+        // service stops after
+        // first timer timeout
+        // callback
+        m_async_ioService.run();
+    }
+
+    DefaultMQPushConsumer::~DefaultMQPushConsumer() {
+        m_pMessageListener = NULL;
+        if (m_pullmsgQueue != NULL) {
+            deleteAndZero(m_pullmsgQueue);
+        }
+        if (m_pRebalance != NULL) {
+            deleteAndZero(m_pRebalance);
+        }
+        if (m_pOffsetStore != NULL) {
+            deleteAndZero(m_pOffsetStore);
+        }
+        if (m_pPullAPIWrapper != NULL) {
+            deleteAndZero(m_pPullAPIWrapper);
+        }
+        if (m_consumerService != NULL) {
+            deleteAndZero(m_consumerService);
+        }
+        PullMAP::iterator it = m_PullCallback.begin();
+        for (; it != m_PullCallback.end(); ++it) {
+            deleteAndZero(it->second);
+        }
+        m_PullCallback.clear();
+        m_subTopics.clear();
+    }
+
+    void DefaultMQPushConsumer::sendMessageBack(MQMessageExt &msg, int delayLevel) {
+        try {
+            getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
+                    msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+        }
+    }
+
+    void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
+            const string &topic, vector<MQMessageQueue> &mqs) {
+        mqs.clear();
+        try {
+            getFactory()->fetchSubscribeMessageQueues(topic, mqs,
+                                                      getSessionCredentials());
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+        }
+    }
+
+    void DefaultMQPushConsumer::doRebalance() {
+        if (isServiceStateOk()) {
+            try {
+                m_pRebalance->doRebalance();
+            } catch (MQException &e) {
+                LOG_ERROR(e.what());
+            }
+        }
+    }
+
+    void DefaultMQPushConsumer::persistConsumerOffset() {
+        if (isServiceStateOk()) {
+            m_pRebalance->persistConsumerOffset();
+        }
+    }
+
+    void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
+        if (isServiceStateOk()) {
+            m_pRebalance->persistConsumerOffsetByResetOffset();
+        }
+    }
+
+    void DefaultMQPushConsumer::start() {
+#ifndef WIN32
+        /* Ignore the SIGPIPE */
+        struct sigaction sa;
+        memset(&sa,0, sizeof(struct sigaction));
+        sa.sa_handler = SIG_IGN;
+        sa.sa_flags = 0;
+        sigaction(SIGPIPE, &sa, 0);
+#endif
+        switch (m_serviceState) {
+            case CREATE_JUST: {
+                m_serviceState = START_FAILED;
+                MQClient::start();
+                LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
+
+                //<!data;
+                checkConfig();
+
+                //<!create rebalance;
+                m_pRebalance = new RebalancePush(this, getFactory());
+
+                string groupname = getGroupName();
+                m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
+
+                if (m_pMessageListener) {
+                    if (m_pMessageListener->getMessageListenerType() ==
+                        messageListenerOrderly) {
+                        LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
+                        m_consumerService = new ConsumeMessageOrderlyService(
+                                this, m_consumeThreadCount, m_pMessageListener);
+                    } else  // for backward compatible, defaultly and concurrently listeners
+                        // are allocating ConsumeMessageConcurrentlyService
+                    {
+                        LOG_INFO("start concurrently consume service:%s",
+                                 getGroupName().c_str());
+                        m_consumerService = new ConsumeMessageConcurrentlyService(
+                                this, m_consumeThreadCount, m_pMessageListener);
+                    }
+                }
+
+                m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
+                m_pullmsgThread.reset(new boost::thread(boost::bind(
+                        &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
+
+                copySubscription();
+
+                //<! registe;
+                bool registerOK = getFactory()->registerConsumer(this);
+                if (!registerOK) {
+                    m_serviceState = CREATE_JUST;
+                    THROW_MQEXCEPTION(
+                            MQClientException,
+                            "The cousumer group[" + getGroupName() +
+                            "] has been created before, specify another name please.",
+                            -1);
+                }
+
+                //<!msg model;
+                switch (getMessageModel()) {
+                    case BROADCASTING:
+                        m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
+                        break;
+                    case CLUSTERING:
+                        m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
+                        break;
+                }
+                bool bStartFailed = false;
+                string errorMsg;
+                try {
+                    m_pOffsetStore->load();
+                } catch (MQClientException &e) {
+                    bStartFailed = true;
+                    errorMsg = std::string(e.what());
+                }
+                m_consumerService->start();
+
+                getFactory()->start();
+
+                updateTopicSubscribeInfoWhenSubscriptionChanged();
+                getFactory()->sendHeartbeatToAllBroker();
+
+                m_serviceState = RUNNING;
+                if (bStartFailed) {
+                    shutdown();
+                    THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
+                }
+                break;
+            }
+            case RUNNING:
+            case START_FAILED:
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+
+        getFactory()->rebalanceImmediately();
+    }
+
+    void DefaultMQPushConsumer::shutdown() {
+        switch (m_serviceState) {
+            case RUNNING: {
+                LOG_INFO("DefaultMQPushConsumer shutdown");
+                m_async_ioService.stop();
+                m_async_service_thread->interrupt();
+                m_async_service_thread->join();
+                m_pullmsgQueue->close();
+                m_pullmsgThread->interrupt();
+                m_pullmsgThread->join();
+                m_consumerService->shutdown();
+                persistConsumerOffset();
+                shutdownAsyncPullCallBack();  // delete aync pullMsg resources
+                getFactory()->unregisterConsumer(this);
+                getFactory()->shutdown();
+                m_serviceState = SHUTDOWN_ALREADY;
+                break;
+            }
+            case CREATE_JUST:
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+
+    void DefaultMQPushConsumer::registerMessageListener(
+            MQMessageListener *pMessageListener) {
+        if (NULL != pMessageListener) {
+            m_pMessageListener = pMessageListener;
+        }
+    }
+
+    MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
+        if (NULL != m_pMessageListener) {
+            return m_pMessageListener->getMessageListenerType();
+        }
+        return messageListenerDefaultly;
+    }
+
+    ConsumeMsgService *DefaultMQPushConsumer::getConsumerMsgService() const {
+        return m_consumerService;
+    }
+
+    OffsetStore *DefaultMQPushConsumer::getOffsetStore() const {
+        return m_pOffsetStore;
+    }
+
+    Rebalance *DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
+
+    void DefaultMQPushConsumer::subscribe(const string &topic,
+                                          const string &subExpression) {
+        m_subTopics[topic] = subExpression;
+    }
+
+    void DefaultMQPushConsumer::checkConfig() {
+        string groupname = getGroupName();
+        // check consumerGroup
+        Validators::checkGroup(groupname);
+
+        // consumerGroup
+        if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
+            THROW_MQEXCEPTION(MQClientException,
+                              "consumerGroup can not equal DEFAULT_CONSUMER", -1);
+        }
+
+        if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
+            THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
+        }
+
+        if (m_pMessageListener == NULL) {
+            THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
+        }
+    }
+
+    void DefaultMQPushConsumer::copySubscription() {
+        map<string, string>::iterator it = m_subTopics.begin();
+        for (; it != m_subTopics.end(); ++it) {
+            LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
+                     it->second.c_str());
+            unique_ptr<SubscriptionData> pSData(
+                    FilterAPI::buildSubscriptionData(it->first, it->second));
+
+            m_pRebalance->setSubscriptionData(it->first, pSData.release());
+        }
+
+        switch (getMessageModel()) {
+            case BROADCASTING:
+                break;
+            case CLUSTERING: {
+                string retryTopic = UtilAll::getRetryTopic(getGroupName());
+
+                //<!this sub;
+                unique_ptr<SubscriptionData> pSData(
+                        FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
+
+                m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
+                break;
+            }
+            default:
+                break;
+        }
+    }
+
+    void DefaultMQPushConsumer::updateTopicSubscribeInfo(
+            const string &topic, vector<MQMessageQueue> &info) {
+        m_pRebalance->setTopicSubscribeInfo(topic, info);
+    }
+
+    void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
+        map<string, SubscriptionData *> &subTable =
+                m_pRebalance->getSubscriptionInner();
+        map<string, SubscriptionData *>::iterator it = subTable.begin();
+        for (; it != subTable.end(); ++it) {
+            bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
+                    it->first, getSessionCredentials());
+            if (btopic == false) {
+                LOG_WARN("The topic:[%s] not exist", it->first.c_str());
+            }
+        }
+    }
+
+    ConsumeType DefaultMQPushConsumer::getConsumeType() {
+        return CONSUME_PASSIVELY;
+    }
+
+    ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
+        return m_consumeFromWhere;
+    }
+
+    void DefaultMQPushConsumer::setConsumeFromWhere(
+            ConsumeFromWhere consumeFromWhere) {
+        m_consumeFromWhere = consumeFromWhere;
+    }
+
+    void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData> &result) {
+        map<string, SubscriptionData *> &subTable =
+                m_pRebalance->getSubscriptionInner();
+        map<string, SubscriptionData *>::iterator it = subTable.begin();
+        for (; it != subTable.end(); ++it) {
+            result.push_back(*(it->second));
+        }
+    }
+
+    void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue &mq,
+                                                    int64 offset) {
+        if (offset >= 0) {
+            m_pOffsetStore->updateOffset(mq, offset);
+        } else {
+            LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
+        }
+    }
+
+    void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue &mq) {
+        m_pOffsetStore->removeOffset(mq);
+    }
+
+    void DefaultMQPushConsumer::triggerNextPullRequest(
+            boost::asio::deadline_timer *t, PullRequest *request) {
+        // LOG_INFO("trigger pullrequest for:%s",
+        // (request->m_messageQueue).toString().c_str());
+        producePullMsgTask(request);
+        deleteAndZero(t);
+    }
+
+    void DefaultMQPushConsumer::producePullMsgTask(PullRequest *request) {
+        if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+            request->addPullMsgEvent();
+            if (m_asyncPull) {
+                m_pullmsgQueue->produce(TaskBinder::gen(
+                        &DefaultMQPushConsumer::pullMessageAsync, this, request));
+            } else {
+                m_pullmsgQueue->produce(
+                        TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
+            }
+        } else {
+            LOG_WARN("produce pullmsg of mq:%s failed",
+                     request->m_messageQueue.toString().c_str());
+        }
+    }
+
+    void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue *pTaskQueue) {
+        pTaskQueue->run();
+    }
+
+    void DefaultMQPushConsumer::pullMessage(PullRequest *request) {
+        if (request == NULL) {
+            LOG_ERROR("Pull request is NULL, return");
+            return;
+        }
+        if (request->isDroped()) {
+            LOG_WARN("Pull request is set drop with mq:%s, return",
+                     (request->m_messageQueue).toString().c_str());
+            request->removePullMsgEvent();
+            return;
+        }
+
+        MQMessageQueue &messageQueue = request->m_messageQueue;
+        if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+            messageListenerOrderly) {
+            if (!request->isLocked() || request->isLockExpired()) {
+                if (!m_pRebalance->lock(messageQueue)) {
+                    producePullMsgTask(request);
+                    return;
+                }
+            }
+        }
+
+        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+            // than:%d",  (request->m_messageQueue).toString().c_str(),
+            // request->getCacheMsgCount(), m_maxMsgCacheSize);
+            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                                      this, t, request));
+            return;
+        }
+
+        bool commitOffsetEnable = false;
+        int64 commitOffsetValue = 0;
+        if (CLUSTERING == getMessageModel()) {
+            commitOffsetValue = m_pOffsetStore->readOffset(
+                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+            if (commitOffsetValue > 0) {
+                commitOffsetEnable = true;
+            }
+        }
+
+        string subExpression;
+        SubscriptionData *pSdata =
+                m_pRebalance->getSubscriptionData(messageQueue.getTopic());
+        if (pSdata == NULL) {
+            producePullMsgTask(request);
+            return;
+        }
+        subExpression = pSdata->getSubString();
+
+        int sysFlag =
+                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                          false,                   // suspend
+                                          !subExpression.empty(),  // subscription
+                                          false);                  // class filter
+
+        try {
+            request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+            unique_ptr<PullResult> result(
+                    m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
+                                                      subExpression,             // 2
+                                                      pSdata->getSubVersion(),   // 3
+                                                      request->getNextOffset(),  // 4
+                                                      32,                        // 5
+                                                      sysFlag,                   // 6
+                                                      commitOffsetValue,         // 7
+                                                      1000 * 15,                 // 8
+                                                      1000 * 30,                 // 9
+                                                      ComMode_SYNC,              // 10
+                                                      NULL, getSessionCredentials()));
+
+            PullResult pullResult = m_pPullAPIWrapper->processPullResult(
+                    messageQueue, result.get(), pSdata);
+
+            switch (pullResult.pullStatus) {
+                case FOUND: {
+                    if (!request->isDroped())  // if request is setted to dropped, don't add
+                        // msgFoundList to m_msgTreeMap and don't
+                        // call producePullMsgTask
+                    {  // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+                        // and this request is dropped, and then received pulled msgs.
+                        request->setNextOffset(pullResult.nextBeginOffset);
+                        request->putMessage(pullResult.msgFoundList);
+
+                        m_consumerService->submitConsumeRequest(request,
+                                                                pullResult.msgFoundList);
+                        producePullMsgTask(request);
+
+                        LOG_DEBUG("FOUND:%s with size:"
+                                          SIZET_FMT
+                                          ",nextBeginOffset:%lld",
+                                  messageQueue.toString().c_str(),
+                                  pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+                    } else {
+                        request->removePullMsgEvent();
+                    }
+                    break;
+                }
+                case NO_NEW_MSG: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    request->getMessage(msgs);
+                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+                        /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+                        is kept, then consumer will enter following situation:
+                        1>. get pull offset with 0 when do rebalance, and set
+                        m_offsetTable[mq] to 0;
+                        2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+                        offset increase by 800
+                        3>. request->getMessage(msgs) always NULL
+                        4>. we need update consumerOffset to nextBeginOffset indicated by
+                        broker
+                        but if really no new msg could be pulled, also go to this CASE
+                     */
+                        // LOG_DEBUG("maybe misMatch between broker and client happens, update
+                        // consumerOffset to nextBeginOffset indicated by broker");
+                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+                    }
+                    producePullMsgTask(request);
+                    LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case NO_MATCHED_MSG: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    vector<MQMessageExt> msgs;
+                    request->getMessage(msgs);
+                    if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+                        // LOG_DEBUG("maybe misMatch between broker and client happens, update
+                        // consumerOffset to nextBeginOffset indicated by broker");
+                        updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+                    }
+                    producePullMsgTask(request);
+
+                    LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case OFFSET_ILLEGAL: {
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    producePullMsgTask(request);
+
+                    LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+                              messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+                    break;
+                }
+                case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
+                    // will not returns this status, so this case
+                    // could not be entered.
+                    LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+                    request->setNextOffset(pullResult.nextBeginOffset);
+                    producePullMsgTask(request);
+                    break;
+                }
+            }
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+            producePullMsgTask(request);
+        }
+    }
+
+    AsyncPullCallback *DefaultMQPushConsumer::getAsyncPullCallBack(
+            PullRequest *request, MQMessageQueue msgQueue) {
+        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+        if (m_asyncPull && request) {
+            PullMAP::iterator it = m_PullCallback.find(msgQueue);
+            if (it == m_PullCallback.end()) {
+                LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
+                m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
+            }
+            return m_PullCallback[msgQueue];
+        }
+
+        return NULL;
+    }
+
+    void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
+        boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+        if (m_asyncPull) {
+            PullMAP::iterator it = m_PullCallback.begin();
+            for (; it != m_PullCallback.end(); ++it) {
+                if (it->second) {
+                    it->second->setShutdownStatus();
+                } else {
+                    LOG_ERROR("could not find asyncPullCallback for:%s",
+                              it->first.toString().c_str());
+                }
+            }
+        }
+    }
+
+    void DefaultMQPushConsumer::pullMessageAsync(PullRequest *request) {
+        if (request == NULL) {
+            LOG_ERROR("Pull request is NULL, return");
+            return;
+        }
+        if (request->isDroped()) {
+            LOG_WARN("Pull request is set drop with mq:%s, return",
+                     (request->m_messageQueue).toString().c_str());
+            request->removePullMsgEvent();
+            return;
+        }
+
+        MQMessageQueue &messageQueue = request->m_messageQueue;
+        if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+            messageListenerOrderly) {
+            if (!request->isLocked() || request->isLockExpired()) {
+                if (!m_pRebalance->lock(messageQueue)) {
+                    producePullMsgTask(request);
+                    return;
+                }
+            }
+        }
+
+        if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+            // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+            // than:%d",  (request->m_messageQueue).toString().c_str(),
+            // request->getCacheMsgCount(), m_maxMsgCacheSize);
+            boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+                    m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+            t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+                                      this, t, request));
+            return;
+        }
+
+        bool commitOffsetEnable = false;
+        int64 commitOffsetValue = 0;
+        if (CLUSTERING == getMessageModel()) {
+            commitOffsetValue = m_pOffsetStore->readOffset(
+                    messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+            if (commitOffsetValue > 0) {
+                commitOffsetEnable = true;
+            }
+        }
+
+        string subExpression;
+        SubscriptionData *pSdata =
+                (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
+        if (pSdata == NULL) {
+            producePullMsgTask(request);
+            return;
+        }
+        subExpression = pSdata->getSubString();
+
+        int sysFlag =
+                PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
+                                          true,                    // suspend
+                                          !subExpression.empty(),  // subscription
+                                          false);                  // class filter
+
+        AsyncArg arg;
+        arg.mq = messageQueue;
+        arg.subData = *pSdata;
+        arg.pPullWrapper = m_pPullAPIWrapper;
+
+        try {
+            request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+            m_pPullAPIWrapper->pullKernelImpl(
+                    messageQueue,                                 // 1
+                    subExpression,                                // 2
+                    pSdata->getSubVersion(),                      // 3
+                    request->getNextOffset(),                     // 4
+                    32,                                           // 5
+                    sysFlag,                                      // 6
+                    commitOffsetValue,                            // 7
+                    1000 * 15,                                    // 8
+                    m_asyncPullTimeout,                           // 9
+                    ComMode_ASYNC,                                // 10
+                    getAsyncPullCallBack(request, messageQueue),  // 11
+                    getSessionCredentials(),                      // 12
+                    &arg);                                        // 13
+        } catch (MQException &e) {
+            LOG_ERROR(e.what());
+            producePullMsgTask(request);
+        }
+    }
+
+    void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
+        if (asyncFlag) {
+            LOG_INFO("set pushConsumer:%s to async default pull mode",
+                     getGroupName().c_str());
+        } else {
+            LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
+        }
+        m_asyncPull = asyncFlag;
+    }
+
+    void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
+        if (threadCount > 0) {
+            m_consumeThreadCount = threadCount;
+        } else {
+            LOG_ERROR("setConsumeThreadCount with invalid value");
+        }
+    }
+
+    int DefaultMQPushConsumer::getConsumeThreadCount() const {
+        return m_consumeThreadCount;
+    }
+
+    void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
+        m_pullMsgThreadPoolNum = threadCount;
+    }
+
+    int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
+        return m_pullMsgThreadPoolNum;
+    }
+
+    int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
+        return m_consumeMessageBatchMaxSize;
+    }
+
+    void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
+            int consumeMessageBatchMaxSize) {
+        if (consumeMessageBatchMaxSize >= 1)
+            m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+    }
+
+    void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
+        if (maxCacheSize > 0 && maxCacheSize < 65535) {
+            LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
+                     getGroupName().c_str());
+            m_maxMsgCacheSize = maxCacheSize;
+        }
+    }
+
+    int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
+        return m_maxMsgCacheSize;
+    }
+
+    ConsumerRunningInfo *DefaultMQPushConsumer::getConsumerRunningInfo() {
+        ConsumerRunningInfo *info = new ConsumerRunningInfo();
+        if (info) {
+            if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+                messageListenerOrderly)
+                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+            else
+                info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
+            info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
+                              UtilAll::to_string(m_consumeThreadCount));
+            info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
+                              UtilAll::to_string(m_startTime));
+
+            vector<SubscriptionData> result;
+            getSubscriptions(result);
+            info->setSubscriptionSet(result);
+
+            map<MQMessageQueue, PullRequest *> requestTable =
+                    m_pRebalance->getPullRequestTable();
+            map<MQMessageQueue, PullRequest *>::iterator it = requestTable.begin();
+
+            for (; it != requestTable.end(); ++it) {
+                if (!it->second->isDroped()) {
+                    map<MessageQueue, ProcessQueueInfo> queueTable;
+                    MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
+                                       (it->first).getQueueId());
+                    ProcessQueueInfo processQueue;
+                    processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
+                    processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
+                    processQueue.cachedMsgCount = it->second->getCacheMsgCount();
+                    processQueue.setCommitOffset(m_pOffsetStore->readOffset(
+                            it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+                    processQueue.setDroped(it->second->isDroped());
+                    processQueue.setLocked(it->second->isLocked());
+                    processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
+                    processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
+                    processQueue.lastConsumeTimestamp =
+                            it->second->getLastConsumeTimestamp();
+                    info->setMqTable(queue, processQueue);
+                }
+            }
+
+            return info;
+        }
+        return NULL;
+    }
+
+//<!************************************************************************
+}  //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index c62bd7d..f7abfaf 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -85,7 +85,7 @@
   map<int64, MQMessageExt> m_msgTreeMapTemp;
   boost::mutex m_pullRequestLock;
   uint64 m_lastLockTimestamp;  // ms
-  uint64 m_tryUnlockTimes;
+  //uint64 m_tryUnlockTimes;
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
   boost::timed_mutex m_consumeLock;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 1411ebd..f0b53cf 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -37,7 +37,7 @@
     : m_sendMsgTimeout(3000),
       m_compressMsgBodyOverHowmuch(4 * 1024),
       m_maxMessageSize(1024 * 128),
-      m_retryAnotherBrokerWhenNotStoreOK(false),
+      //m_retryAnotherBrokerWhenNotStoreOK(false),
       m_compressLevel(5),
       m_retryTimes(5) {
   //<!set default group name;
@@ -50,7 +50,8 @@
 void DefaultMQProducer::start() {
 #ifndef WIN32
   /* Ignore the SIGPIPE */
-  struct sigaction sa = {0};
+  struct sigaction sa;
+  memset(&sa,0, sizeof(struct sigaction));
   sa.sa_handler = SIG_IGN;
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
diff --git a/src/thread/disruptor/sequence.h b/src/thread/disruptor/sequence.h
index b3e5876..4fb61a4 100755
--- a/src/thread/disruptor/sequence.h
+++ b/src/thread/disruptor/sequence.h
@@ -102,9 +102,9 @@
     PaddedSequence(int64_t initial_value = kInitialCursorValue) :
             Sequence(initial_value) {}
 
- private:
+ //private:
     // padding
-    int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
+    //int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
 
 };
 
@@ -133,8 +133,8 @@
  public:
      PaddedLong(int64_t initial_value = kInitialCursorValue) :
          MutableLong(initial_value) {}
- private:
-     int64_t padding_[SEQUENCE_PADDING_LENGTH];
+ //private:
+     //int64_t padding_[SEQUENCE_PADDING_LENGTH];
 };
 
 int64_t GetMinimumSequence(
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 62cc19e..011c420 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -30,12 +30,12 @@
 TcpTransport::TcpTransport(TcpRemotingClient *pTcpRemointClient,
                            READ_CALLBACK handle /* = NULL */)
     : m_tcpConnectStatus(e_connectInit),
-      m_ReadDatathread(NULL),
-      m_readcallback(handle),
-      m_tcpRemotingClient(pTcpRemointClient),
       m_event_base_status(false),
       m_event_base_mtx(),
-      m_event_base_cv() {
+      m_event_base_cv(),
+      m_ReadDatathread(NULL),
+      m_readcallback(handle),
+      m_tcpRemotingClient(pTcpRemointClient){
   m_startTime = UtilAll::currentTimeMillis();
 #ifdef WIN32
   evthread_use_windows_threads();