Fix build warnings
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3a21ab4..e9d5ca2 100755
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -139,6 +139,8 @@
-fPIC
-fno-strict-aliasing
-std=c++11
+ -Wno-unused-local-typedef
+ -Wno-expansion-to-defined
# -finline-limit=1000
# -Wextra
# -pedantic
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 9a8f41f..b3ba2b2 100755
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -97,7 +97,7 @@
int m_sendMsgTimeout;
int m_compressMsgBodyOverHowmuch;
int m_maxMessageSize; //<! default:128K;
- bool m_retryAnotherBrokerWhenNotStoreOK;
+ //bool m_retryAnotherBrokerWhenNotStoreOK;
int m_compressLevel;
int m_retryTimes;
};
diff --git a/src/common/sync_http_client.h b/src/common/sync_http_client.h
index b25cc77..9836605 100755
--- a/src/common/sync_http_client.h
+++ b/src/common/sync_http_client.h
@@ -15,7 +15,7 @@
* limitations under the License.
*/
#ifndef ROCKETMQ_CLIENT4CPP__SYNC_HTTP_CLIENT_H_
-#define ROCKETMQ_CLIENT4CPP_SYNC_HTTP_CLIENT_H_
+#define ROCKETMQ_CLIENT4CPP__SYNC_HTTP_CLIENT_H_
#include <string>
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 4aa33f6..073a801 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -58,7 +58,8 @@
void DefaultMQPullConsumer::start() {
#ifndef WIN32
/* Ignore the SIGPIPE */
- struct sigaction sa = {0};
+ struct sigaction sa;
+ memset(&sa,0, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 8963691..159e0ff 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -1,962 +1,963 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "DefaultMQPushConsumer.h"
-#include "CommunicationMode.h"
-#include "ConsumeMsgService.h"
-#include "ConsumerRunningInfo.h"
-#include "FilterAPI.h"
-#include "Logging.h"
-#include "MQClientAPIImpl.h"
-#include "MQClientFactory.h"
-#include "MQClientManager.h"
-#include "MQProtos.h"
-#include "OffsetStore.h"
-#include "PullAPIWrapper.h"
-#include "PullSysFlag.h"
-#include "Rebalance.h"
-#include "UtilAll.h"
-#include "Validators.h"
-#include "task_queue.h"
-
-namespace rocketmq {
-
- class AsyncPullCallback : public PullCallback {
- public:
- AsyncPullCallback(DefaultMQPushConsumer *pushConsumer, PullRequest *request)
- : m_callbackOwner(pushConsumer),
- m_pullRequest(request),
- m_bShutdown(false) {}
-
- virtual ~AsyncPullCallback() {
- m_callbackOwner = NULL;
- m_pullRequest = NULL;
- }
-
- virtual void onSuccess(MQMessageQueue &mq, PullResult &result,
- bool bProducePullRequest) {
- if (m_bShutdown == true) {
- LOG_INFO("pullrequest for:%s in shutdown, return",
- (m_pullRequest->m_messageQueue).toString().c_str());
- m_pullRequest->removePullMsgEvent();
- return;
- }
-
- switch (result.pullStatus) {
- case FOUND: {
- if (!m_pullRequest->isDroped()) // if request is setted to dropped,
- // don't add msgFoundList to
- // m_msgTreeMap and don't call
- // producePullMsgTask
- { // avoid issue: pullMsg is sent out, rebalance is doing concurrently
- // and this request is dropped, and then received pulled msgs.
- m_pullRequest->setNextOffset(result.nextBeginOffset);
- m_pullRequest->putMessage(result.msgFoundList);
-
- m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
- m_pullRequest, result.msgFoundList);
-
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
-
- LOG_DEBUG("FOUND:%s with size:"
- SIZET_FMT
- ", nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
- result.msgFoundList.size(), result.nextBeginOffset);
- } else {
- LOG_INFO("remove pullmsg event of mq:%s",
- (m_pullRequest->m_messageQueue).toString().c_str());
- m_pullRequest->removePullMsgEvent();
- }
- break;
- }
- case NO_NEW_MSG: {
- m_pullRequest->setNextOffset(result.nextBeginOffset);
-
- vector<MQMessageExt> msgs;
- m_pullRequest->getMessage(msgs);
- if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
- /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
- is kept, then consumer will enter following situation:
- 1>. get pull offset with 0 when do rebalance, and set
- m_offsetTable[mq] to 0;
- 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
- offset increase by 800
- 3>. request->getMessage(msgs) always NULL
- 4>. we need update consumerOffset to nextBeginOffset indicated by
- broker
- but if really no new msg could be pulled, also go to this CASE
-
- LOG_INFO("maybe misMatch between broker and client happens, update
- consumerOffset to nextBeginOffset indicated by broker");*/
- m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
- result.nextBeginOffset);
- }
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
-
- /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
- result.nextBeginOffset);*/
- break;
- }
- case NO_MATCHED_MSG: {
- m_pullRequest->setNextOffset(result.nextBeginOffset);
-
- vector<MQMessageExt> msgs;
- m_pullRequest->getMessage(msgs);
- if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
- /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
- is kept, then consumer will enter following situation:
- 1>. get pull offset with 0 when do rebalance, and set
- m_offsetTable[mq] to 0;
- 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
- offset increase by 800
- 3>. request->getMessage(msgs) always NULL
- 4>. we need update consumerOffset to nextBeginOffset indicated by
- broker
- but if really no new msg could be pulled, also go to this CASE
-
- LOG_INFO("maybe misMatch between broker and client happens, update
- consumerOffset to nextBeginOffset indicated by broker");*/
- m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
- result.nextBeginOffset);
- }
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
- /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
- result.nextBeginOffset);*/
- break;
- }
- case OFFSET_ILLEGAL: {
- m_pullRequest->setNextOffset(result.nextBeginOffset);
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
-
- /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
- result.nextBeginOffset);*/
- break;
- }
- case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
- // will not returns this status, so this case
- // could not be entered.
- LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
- m_pullRequest->setNextOffset(result.nextBeginOffset);
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
- break;
- }
- }
- }
-
- virtual void onException(MQException &e) {
- if (m_bShutdown == true) {
- LOG_INFO("pullrequest for:%s in shutdown, return",
- (m_pullRequest->m_messageQueue).toString().c_str());
- m_pullRequest->removePullMsgEvent();
- return;
- }
- LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
- (m_pullRequest->m_messageQueue).toString().c_str());
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- }
-
- void setShutdownStatus() { m_bShutdown = true; }
-
- private:
- DefaultMQPushConsumer *m_callbackOwner;
- PullRequest *m_pullRequest;
- bool m_bShutdown;
- };
-
-//<!***************************************************************************
- static boost::mutex m_asyncCallbackLock;
-
- DefaultMQPushConsumer::DefaultMQPushConsumer(const string &groupname)
- : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
- m_pOffsetStore(NULL),
- m_pPullAPIWrapper(NULL),
- m_pMessageListener(NULL),
- m_consumeMessageBatchMaxSize(1),
- m_maxMsgCacheSize(1000) {
- //<!set default group name;
- string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
- setGroupName(gname);
- m_asyncPull = true;
- m_asyncPullTimeout = 30 * 1000;
- setMessageModel(CLUSTERING);
-
- m_startTime = UtilAll::currentTimeMillis();
- m_consumeThreadCount = boost::thread::hardware_concurrency();
- m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
- m_async_service_thread.reset(new boost::thread(
- boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
- }
-
- void DefaultMQPushConsumer::boost_asio_work() {
- LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
- boost::asio::io_service::work work(m_async_ioService); // avoid async io
- // service stops after
- // first timer timeout
- // callback
- m_async_ioService.run();
- }
-
- DefaultMQPushConsumer::~DefaultMQPushConsumer() {
- m_pMessageListener = NULL;
- if (m_pullmsgQueue != NULL) {
- deleteAndZero(m_pullmsgQueue);
- }
- if (m_pRebalance != NULL) {
- deleteAndZero(m_pRebalance);
- }
- if (m_pOffsetStore != NULL) {
- deleteAndZero(m_pOffsetStore);
- }
- if (m_pPullAPIWrapper != NULL) {
- deleteAndZero(m_pPullAPIWrapper);
- }
- if (m_consumerService != NULL) {
- deleteAndZero(m_consumerService);
- }
- PullMAP::iterator it = m_PullCallback.begin();
- for (; it != m_PullCallback.end(); ++it) {
- deleteAndZero(it->second);
- }
- m_PullCallback.clear();
- m_subTopics.clear();
- }
-
- void DefaultMQPushConsumer::sendMessageBack(MQMessageExt &msg, int delayLevel) {
- try {
- getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
- msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
- } catch (MQException &e) {
- LOG_ERROR(e.what());
- }
- }
-
- void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
- const string &topic, vector<MQMessageQueue> &mqs) {
- mqs.clear();
- try {
- getFactory()->fetchSubscribeMessageQueues(topic, mqs,
- getSessionCredentials());
- } catch (MQException &e) {
- LOG_ERROR(e.what());
- }
- }
-
- void DefaultMQPushConsumer::doRebalance() {
- if (isServiceStateOk()) {
- try {
- m_pRebalance->doRebalance();
- } catch (MQException &e) {
- LOG_ERROR(e.what());
- }
- }
- }
-
- void DefaultMQPushConsumer::persistConsumerOffset() {
- if (isServiceStateOk()) {
- m_pRebalance->persistConsumerOffset();
- }
- }
-
- void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
- if (isServiceStateOk()) {
- m_pRebalance->persistConsumerOffsetByResetOffset();
- }
- }
-
- void DefaultMQPushConsumer::start() {
-#ifndef WIN32
- /* Ignore the SIGPIPE */
- struct sigaction sa = {0};
- sa.sa_handler = SIG_IGN;
- sa.sa_flags = 0;
- sigaction(SIGPIPE, &sa, 0);
-#endif
- switch (m_serviceState) {
- case CREATE_JUST: {
- m_serviceState = START_FAILED;
- MQClient::start();
- LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
-
- //<!data;
- checkConfig();
-
- //<!create rebalance;
- m_pRebalance = new RebalancePush(this, getFactory());
-
- string groupname = getGroupName();
- m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
-
- if (m_pMessageListener) {
- if (m_pMessageListener->getMessageListenerType() ==
- messageListenerOrderly) {
- LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
- m_consumerService = new ConsumeMessageOrderlyService(
- this, m_consumeThreadCount, m_pMessageListener);
- } else // for backward compatible, defaultly and concurrently listeners
- // are allocating ConsumeMessageConcurrentlyService
- {
- LOG_INFO("start concurrently consume service:%s",
- getGroupName().c_str());
- m_consumerService = new ConsumeMessageConcurrentlyService(
- this, m_consumeThreadCount, m_pMessageListener);
- }
- }
-
- m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
- m_pullmsgThread.reset(new boost::thread(boost::bind(
- &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
-
- copySubscription();
-
- //<! registe;
- bool registerOK = getFactory()->registerConsumer(this);
- if (!registerOK) {
- m_serviceState = CREATE_JUST;
- THROW_MQEXCEPTION(
- MQClientException,
- "The cousumer group[" + getGroupName() +
- "] has been created before, specify another name please.",
- -1);
- }
-
- //<!msg model;
- switch (getMessageModel()) {
- case BROADCASTING:
- m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
- break;
- case CLUSTERING:
- m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
- break;
- }
- bool bStartFailed = false;
- string errorMsg;
- try {
- m_pOffsetStore->load();
- } catch (MQClientException &e) {
- bStartFailed = true;
- errorMsg = std::string(e.what());
- }
- m_consumerService->start();
-
- getFactory()->start();
-
- updateTopicSubscribeInfoWhenSubscriptionChanged();
- getFactory()->sendHeartbeatToAllBroker();
-
- m_serviceState = RUNNING;
- if (bStartFailed) {
- shutdown();
- THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
- }
- break;
- }
- case RUNNING:
- case START_FAILED:
- case SHUTDOWN_ALREADY:
- break;
- default:
- break;
- }
-
- getFactory()->rebalanceImmediately();
- }
-
- void DefaultMQPushConsumer::shutdown() {
- switch (m_serviceState) {
- case RUNNING: {
- LOG_INFO("DefaultMQPushConsumer shutdown");
- m_async_ioService.stop();
- m_async_service_thread->interrupt();
- m_async_service_thread->join();
- m_pullmsgQueue->close();
- m_pullmsgThread->interrupt();
- m_pullmsgThread->join();
- m_consumerService->shutdown();
- persistConsumerOffset();
- shutdownAsyncPullCallBack(); // delete aync pullMsg resources
- getFactory()->unregisterConsumer(this);
- getFactory()->shutdown();
- m_serviceState = SHUTDOWN_ALREADY;
- break;
- }
- case CREATE_JUST:
- case SHUTDOWN_ALREADY:
- break;
- default:
- break;
- }
- }
-
- void DefaultMQPushConsumer::registerMessageListener(
- MQMessageListener *pMessageListener) {
- if (NULL != pMessageListener) {
- m_pMessageListener = pMessageListener;
- }
- }
-
- MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
- if (NULL != m_pMessageListener) {
- return m_pMessageListener->getMessageListenerType();
- }
- return messageListenerDefaultly;
- }
-
- ConsumeMsgService *DefaultMQPushConsumer::getConsumerMsgService() const {
- return m_consumerService;
- }
-
- OffsetStore *DefaultMQPushConsumer::getOffsetStore() const {
- return m_pOffsetStore;
- }
-
- Rebalance *DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
-
- void DefaultMQPushConsumer::subscribe(const string &topic,
- const string &subExpression) {
- m_subTopics[topic] = subExpression;
- }
-
- void DefaultMQPushConsumer::checkConfig() {
- string groupname = getGroupName();
- // check consumerGroup
- Validators::checkGroup(groupname);
-
- // consumerGroup
- if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
- THROW_MQEXCEPTION(MQClientException,
- "consumerGroup can not equal DEFAULT_CONSUMER", -1);
- }
-
- if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
- THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
- }
-
- if (m_pMessageListener == NULL) {
- THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
- }
- }
-
- void DefaultMQPushConsumer::copySubscription() {
- map<string, string>::iterator it = m_subTopics.begin();
- for (; it != m_subTopics.end(); ++it) {
- LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
- it->second.c_str());
- unique_ptr<SubscriptionData> pSData(
- FilterAPI::buildSubscriptionData(it->first, it->second));
-
- m_pRebalance->setSubscriptionData(it->first, pSData.release());
- }
-
- switch (getMessageModel()) {
- case BROADCASTING:
- break;
- case CLUSTERING: {
- string retryTopic = UtilAll::getRetryTopic(getGroupName());
-
- //<!this sub;
- unique_ptr<SubscriptionData> pSData(
- FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
-
- m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
- break;
- }
- default:
- break;
- }
- }
-
- void DefaultMQPushConsumer::updateTopicSubscribeInfo(
- const string &topic, vector<MQMessageQueue> &info) {
- m_pRebalance->setTopicSubscribeInfo(topic, info);
- }
-
- void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
- map<string, SubscriptionData *> &subTable =
- m_pRebalance->getSubscriptionInner();
- map<string, SubscriptionData *>::iterator it = subTable.begin();
- for (; it != subTable.end(); ++it) {
- bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
- it->first, getSessionCredentials());
- if (btopic == false) {
- LOG_WARN("The topic:[%s] not exist", it->first.c_str());
- }
- }
- }
-
- ConsumeType DefaultMQPushConsumer::getConsumeType() {
- return CONSUME_PASSIVELY;
- }
-
- ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
- return m_consumeFromWhere;
- }
-
- void DefaultMQPushConsumer::setConsumeFromWhere(
- ConsumeFromWhere consumeFromWhere) {
- m_consumeFromWhere = consumeFromWhere;
- }
-
- void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData> &result) {
- map<string, SubscriptionData *> &subTable =
- m_pRebalance->getSubscriptionInner();
- map<string, SubscriptionData *>::iterator it = subTable.begin();
- for (; it != subTable.end(); ++it) {
- result.push_back(*(it->second));
- }
- }
-
- void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue &mq,
- int64 offset) {
- if (offset >= 0) {
- m_pOffsetStore->updateOffset(mq, offset);
- } else {
- LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
- }
- }
-
- void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue &mq) {
- m_pOffsetStore->removeOffset(mq);
- }
-
- void DefaultMQPushConsumer::triggerNextPullRequest(
- boost::asio::deadline_timer *t, PullRequest *request) {
- // LOG_INFO("trigger pullrequest for:%s",
- // (request->m_messageQueue).toString().c_str());
- producePullMsgTask(request);
- deleteAndZero(t);
- }
-
- void DefaultMQPushConsumer::producePullMsgTask(PullRequest *request) {
- if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
- request->addPullMsgEvent();
- if (m_asyncPull) {
- m_pullmsgQueue->produce(TaskBinder::gen(
- &DefaultMQPushConsumer::pullMessageAsync, this, request));
- } else {
- m_pullmsgQueue->produce(
- TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
- }
- } else {
- LOG_WARN("produce pullmsg of mq:%s failed",
- request->m_messageQueue.toString().c_str());
- }
- }
-
- void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue *pTaskQueue) {
- pTaskQueue->run();
- }
-
- void DefaultMQPushConsumer::pullMessage(PullRequest *request) {
- if (request == NULL) {
- LOG_ERROR("Pull request is NULL, return");
- return;
- }
- if (request->isDroped()) {
- LOG_WARN("Pull request is set drop with mq:%s, return",
- (request->m_messageQueue).toString().c_str());
- request->removePullMsgEvent();
- return;
- }
-
- MQMessageQueue &messageQueue = request->m_messageQueue;
- if (m_consumerService->getConsumeMsgSerivceListenerType() ==
- messageListenerOrderly) {
- if (!request->isLocked() || request->isLockExpired()) {
- if (!m_pRebalance->lock(messageQueue)) {
- producePullMsgTask(request);
- return;
- }
- }
- }
-
- if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
- // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
- // than:%d", (request->m_messageQueue).toString().c_str(),
- // request->getCacheMsgCount(), m_maxMsgCacheSize);
- boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
- m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
- t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
- this, t, request));
- return;
- }
-
- bool commitOffsetEnable = false;
- int64 commitOffsetValue = 0;
- if (CLUSTERING == getMessageModel()) {
- commitOffsetValue = m_pOffsetStore->readOffset(
- messageQueue, READ_FROM_MEMORY, getSessionCredentials());
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
-
- string subExpression;
- SubscriptionData *pSdata =
- m_pRebalance->getSubscriptionData(messageQueue.getTopic());
- if (pSdata == NULL) {
- producePullMsgTask(request);
- return;
- }
- subExpression = pSdata->getSubString();
-
- int sysFlag =
- PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
- false, // suspend
- !subExpression.empty(), // subscription
- false); // class filter
-
- try {
- request->setLastPullTimestamp(UtilAll::currentTimeMillis());
- unique_ptr<PullResult> result(
- m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
- subExpression, // 2
- pSdata->getSubVersion(), // 3
- request->getNextOffset(), // 4
- 32, // 5
- sysFlag, // 6
- commitOffsetValue, // 7
- 1000 * 15, // 8
- 1000 * 30, // 9
- ComMode_SYNC, // 10
- NULL, getSessionCredentials()));
-
- PullResult pullResult = m_pPullAPIWrapper->processPullResult(
- messageQueue, result.get(), pSdata);
-
- switch (pullResult.pullStatus) {
- case FOUND: {
- if (!request->isDroped()) // if request is setted to dropped, don't add
- // msgFoundList to m_msgTreeMap and don't
- // call producePullMsgTask
- { // avoid issue: pullMsg is sent out, rebalance is doing concurrently
- // and this request is dropped, and then received pulled msgs.
- request->setNextOffset(pullResult.nextBeginOffset);
- request->putMessage(pullResult.msgFoundList);
-
- m_consumerService->submitConsumeRequest(request,
- pullResult.msgFoundList);
- producePullMsgTask(request);
-
- LOG_DEBUG("FOUND:%s with size:"
- SIZET_FMT
- ",nextBeginOffset:%lld",
- messageQueue.toString().c_str(),
- pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
- } else {
- request->removePullMsgEvent();
- }
- break;
- }
- case NO_NEW_MSG: {
- request->setNextOffset(pullResult.nextBeginOffset);
- vector<MQMessageExt> msgs;
- request->getMessage(msgs);
- if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
- /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
- is kept, then consumer will enter following situation:
- 1>. get pull offset with 0 when do rebalance, and set
- m_offsetTable[mq] to 0;
- 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
- offset increase by 800
- 3>. request->getMessage(msgs) always NULL
- 4>. we need update consumerOffset to nextBeginOffset indicated by
- broker
- but if really no new msg could be pulled, also go to this CASE
- */
- // LOG_DEBUG("maybe misMatch between broker and client happens, update
- // consumerOffset to nextBeginOffset indicated by broker");
- updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
- }
- producePullMsgTask(request);
- LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
- messageQueue.toString().c_str(), pullResult.nextBeginOffset);
- break;
- }
- case NO_MATCHED_MSG: {
- request->setNextOffset(pullResult.nextBeginOffset);
- vector<MQMessageExt> msgs;
- request->getMessage(msgs);
- if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
- // LOG_DEBUG("maybe misMatch between broker and client happens, update
- // consumerOffset to nextBeginOffset indicated by broker");
- updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
- }
- producePullMsgTask(request);
-
- LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
- messageQueue.toString().c_str(), pullResult.nextBeginOffset);
- break;
- }
- case OFFSET_ILLEGAL: {
- request->setNextOffset(pullResult.nextBeginOffset);
- producePullMsgTask(request);
-
- LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
- messageQueue.toString().c_str(), pullResult.nextBeginOffset);
- break;
- }
- case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
- // will not returns this status, so this case
- // could not be entered.
- LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
- request->setNextOffset(pullResult.nextBeginOffset);
- producePullMsgTask(request);
- break;
- }
- }
- } catch (MQException &e) {
- LOG_ERROR(e.what());
- producePullMsgTask(request);
- }
- }
-
- AsyncPullCallback *DefaultMQPushConsumer::getAsyncPullCallBack(
- PullRequest *request, MQMessageQueue msgQueue) {
- boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
- if (m_asyncPull && request) {
- PullMAP::iterator it = m_PullCallback.find(msgQueue);
- if (it == m_PullCallback.end()) {
- LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
- m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
- }
- return m_PullCallback[msgQueue];
- }
-
- return NULL;
- }
-
- void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
- boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
- if (m_asyncPull) {
- PullMAP::iterator it = m_PullCallback.begin();
- for (; it != m_PullCallback.end(); ++it) {
- if (it->second) {
- it->second->setShutdownStatus();
- } else {
- LOG_ERROR("could not find asyncPullCallback for:%s",
- it->first.toString().c_str());
- }
- }
- }
- }
-
- void DefaultMQPushConsumer::pullMessageAsync(PullRequest *request) {
- if (request == NULL) {
- LOG_ERROR("Pull request is NULL, return");
- return;
- }
- if (request->isDroped()) {
- LOG_WARN("Pull request is set drop with mq:%s, return",
- (request->m_messageQueue).toString().c_str());
- request->removePullMsgEvent();
- return;
- }
-
- MQMessageQueue &messageQueue = request->m_messageQueue;
- if (m_consumerService->getConsumeMsgSerivceListenerType() ==
- messageListenerOrderly) {
- if (!request->isLocked() || request->isLockExpired()) {
- if (!m_pRebalance->lock(messageQueue)) {
- producePullMsgTask(request);
- return;
- }
- }
- }
-
- if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
- // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
- // than:%d", (request->m_messageQueue).toString().c_str(),
- // request->getCacheMsgCount(), m_maxMsgCacheSize);
- boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
- m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
- t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
- this, t, request));
- return;
- }
-
- bool commitOffsetEnable = false;
- int64 commitOffsetValue = 0;
- if (CLUSTERING == getMessageModel()) {
- commitOffsetValue = m_pOffsetStore->readOffset(
- messageQueue, READ_FROM_MEMORY, getSessionCredentials());
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
-
- string subExpression;
- SubscriptionData *pSdata =
- (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
- if (pSdata == NULL) {
- producePullMsgTask(request);
- return;
- }
- subExpression = pSdata->getSubString();
-
- int sysFlag =
- PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
- true, // suspend
- !subExpression.empty(), // subscription
- false); // class filter
-
- AsyncArg arg;
- arg.mq = messageQueue;
- arg.subData = *pSdata;
- arg.pPullWrapper = m_pPullAPIWrapper;
-
- try {
- request->setLastPullTimestamp(UtilAll::currentTimeMillis());
- m_pPullAPIWrapper->pullKernelImpl(
- messageQueue, // 1
- subExpression, // 2
- pSdata->getSubVersion(), // 3
- request->getNextOffset(), // 4
- 32, // 5
- sysFlag, // 6
- commitOffsetValue, // 7
- 1000 * 15, // 8
- m_asyncPullTimeout, // 9
- ComMode_ASYNC, // 10
- getAsyncPullCallBack(request, messageQueue), // 11
- getSessionCredentials(), // 12
- &arg); // 13
- } catch (MQException &e) {
- LOG_ERROR(e.what());
- producePullMsgTask(request);
- }
- }
-
- void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
- if (asyncFlag) {
- LOG_INFO("set pushConsumer:%s to async default pull mode",
- getGroupName().c_str());
- } else {
- LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
- }
- m_asyncPull = asyncFlag;
- }
-
- void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
- if (threadCount > 0) {
- m_consumeThreadCount = threadCount;
- } else {
- LOG_ERROR("setConsumeThreadCount with invalid value");
- }
- }
-
- int DefaultMQPushConsumer::getConsumeThreadCount() const {
- return m_consumeThreadCount;
- }
-
- void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
- m_pullMsgThreadPoolNum = threadCount;
- }
-
- int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
- return m_pullMsgThreadPoolNum;
- }
-
- int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
- return m_consumeMessageBatchMaxSize;
- }
-
- void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
- int consumeMessageBatchMaxSize) {
- if (consumeMessageBatchMaxSize >= 1)
- m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
- }
-
- void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
- if (maxCacheSize > 0 && maxCacheSize < 65535) {
- LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
- getGroupName().c_str());
- m_maxMsgCacheSize = maxCacheSize;
- }
- }
-
- int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
- return m_maxMsgCacheSize;
- }
-
- ConsumerRunningInfo *DefaultMQPushConsumer::getConsumerRunningInfo() {
- ConsumerRunningInfo *info = new ConsumerRunningInfo();
- if (info) {
- if (m_consumerService->getConsumeMsgSerivceListenerType() ==
- messageListenerOrderly)
- info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
- else
- info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
- info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
- UtilAll::to_string(m_consumeThreadCount));
- info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
- UtilAll::to_string(m_startTime));
-
- vector<SubscriptionData> result;
- getSubscriptions(result);
- info->setSubscriptionSet(result);
-
- map<MQMessageQueue, PullRequest *> requestTable =
- m_pRebalance->getPullRequestTable();
- map<MQMessageQueue, PullRequest *>::iterator it = requestTable.begin();
-
- for (; it != requestTable.end(); ++it) {
- if (!it->second->isDroped()) {
- map<MessageQueue, ProcessQueueInfo> queueTable;
- MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
- (it->first).getQueueId());
- ProcessQueueInfo processQueue;
- processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
- processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
- processQueue.cachedMsgCount = it->second->getCacheMsgCount();
- processQueue.setCommitOffset(m_pOffsetStore->readOffset(
- it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
- processQueue.setDroped(it->second->isDroped());
- processQueue.setLocked(it->second->isLocked());
- processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
- processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
- processQueue.lastConsumeTimestamp =
- it->second->getLastConsumeTimestamp();
- info->setMqTable(queue, processQueue);
- }
- }
-
- return info;
- }
- return NULL;
- }
-
-//<!************************************************************************
-} //<!end namespace;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQPushConsumer.h"
+#include "CommunicationMode.h"
+#include "ConsumeMsgService.h"
+#include "ConsumerRunningInfo.h"
+#include "FilterAPI.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQProtos.h"
+#include "OffsetStore.h"
+#include "PullAPIWrapper.h"
+#include "PullSysFlag.h"
+#include "Rebalance.h"
+#include "UtilAll.h"
+#include "Validators.h"
+#include "task_queue.h"
+
+namespace rocketmq {
+
+ class AsyncPullCallback : public PullCallback {
+ public:
+ AsyncPullCallback(DefaultMQPushConsumer *pushConsumer, PullRequest *request)
+ : m_callbackOwner(pushConsumer),
+ m_pullRequest(request),
+ m_bShutdown(false) {}
+
+ virtual ~AsyncPullCallback() {
+ m_callbackOwner = NULL;
+ m_pullRequest = NULL;
+ }
+
+ virtual void onSuccess(MQMessageQueue &mq, PullResult &result,
+ bool bProducePullRequest) {
+ if (m_bShutdown == true) {
+ LOG_INFO("pullrequest for:%s in shutdown, return",
+ (m_pullRequest->m_messageQueue).toString().c_str());
+ m_pullRequest->removePullMsgEvent();
+ return;
+ }
+
+ switch (result.pullStatus) {
+ case FOUND: {
+ if (!m_pullRequest->isDroped()) // if request is setted to dropped,
+ // don't add msgFoundList to
+ // m_msgTreeMap and don't call
+ // producePullMsgTask
+ { // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+ // and this request is dropped, and then received pulled msgs.
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+ m_pullRequest->putMessage(result.msgFoundList);
+
+ m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(
+ m_pullRequest, result.msgFoundList);
+
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ else
+ m_pullRequest->removePullMsgEvent();
+
+ LOG_DEBUG("FOUND:%s with size:"
+ SIZET_FMT
+ ", nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.msgFoundList.size(), result.nextBeginOffset);
+ } else {
+ LOG_INFO("remove pullmsg event of mq:%s",
+ (m_pullRequest->m_messageQueue).toString().c_str());
+ m_pullRequest->removePullMsgEvent();
+ }
+ break;
+ }
+ case NO_NEW_MSG: {
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+ vector<MQMessageExt> msgs;
+ m_pullRequest->getMessage(msgs);
+ if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+ is kept, then consumer will enter following situation:
+ 1>. get pull offset with 0 when do rebalance, and set
+ m_offsetTable[mq] to 0;
+ 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+ offset increase by 800
+ 3>. request->getMessage(msgs) always NULL
+ 4>. we need update consumerOffset to nextBeginOffset indicated by
+ broker
+ but if really no new msg could be pulled, also go to this CASE
+
+ LOG_INFO("maybe misMatch between broker and client happens, update
+ consumerOffset to nextBeginOffset indicated by broker");*/
+ m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+ result.nextBeginOffset);
+ }
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ else
+ m_pullRequest->removePullMsgEvent();
+
+ /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.nextBeginOffset);*/
+ break;
+ }
+ case NO_MATCHED_MSG: {
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+
+ vector<MQMessageExt> msgs;
+ m_pullRequest->getMessage(msgs);
+ if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+ is kept, then consumer will enter following situation:
+ 1>. get pull offset with 0 when do rebalance, and set
+ m_offsetTable[mq] to 0;
+ 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+ offset increase by 800
+ 3>. request->getMessage(msgs) always NULL
+ 4>. we need update consumerOffset to nextBeginOffset indicated by
+ broker
+ but if really no new msg could be pulled, also go to this CASE
+
+ LOG_INFO("maybe misMatch between broker and client happens, update
+ consumerOffset to nextBeginOffset indicated by broker");*/
+ m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
+ result.nextBeginOffset);
+ }
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ else
+ m_pullRequest->removePullMsgEvent();
+ /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.nextBeginOffset);*/
+ break;
+ }
+ case OFFSET_ILLEGAL: {
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ else
+ m_pullRequest->removePullMsgEvent();
+
+ /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+ (m_pullRequest->m_messageQueue).toString().c_str(),
+ result.nextBeginOffset);*/
+ break;
+ }
+ case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
+ // will not returns this status, so this case
+ // could not be entered.
+ LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+ m_pullRequest->setNextOffset(result.nextBeginOffset);
+ if (bProducePullRequest)
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ else
+ m_pullRequest->removePullMsgEvent();
+ break;
+ }
+ }
+ }
+
+ virtual void onException(MQException &e) {
+ if (m_bShutdown == true) {
+ LOG_INFO("pullrequest for:%s in shutdown, return",
+ (m_pullRequest->m_messageQueue).toString().c_str());
+ m_pullRequest->removePullMsgEvent();
+ return;
+ }
+ LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
+ (m_pullRequest->m_messageQueue).toString().c_str());
+ m_callbackOwner->producePullMsgTask(m_pullRequest);
+ }
+
+ void setShutdownStatus() { m_bShutdown = true; }
+
+ private:
+ DefaultMQPushConsumer *m_callbackOwner;
+ PullRequest *m_pullRequest;
+ bool m_bShutdown;
+ };
+
+//<!***************************************************************************
+ static boost::mutex m_asyncCallbackLock;
+
+ DefaultMQPushConsumer::DefaultMQPushConsumer(const string &groupname)
+ : m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
+ m_pOffsetStore(NULL),
+ m_pPullAPIWrapper(NULL),
+ m_pMessageListener(NULL),
+ m_consumeMessageBatchMaxSize(1),
+ m_maxMsgCacheSize(1000) {
+ //<!set default group name;
+ string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
+ setGroupName(gname);
+ m_asyncPull = true;
+ m_asyncPullTimeout = 30 * 1000;
+ setMessageModel(CLUSTERING);
+
+ m_startTime = UtilAll::currentTimeMillis();
+ m_consumeThreadCount = boost::thread::hardware_concurrency();
+ m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
+ m_async_service_thread.reset(new boost::thread(
+ boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
+ }
+
+ void DefaultMQPushConsumer::boost_asio_work() {
+ LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
+ boost::asio::io_service::work work(m_async_ioService); // avoid async io
+ // service stops after
+ // first timer timeout
+ // callback
+ m_async_ioService.run();
+ }
+
+ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
+ m_pMessageListener = NULL;
+ if (m_pullmsgQueue != NULL) {
+ deleteAndZero(m_pullmsgQueue);
+ }
+ if (m_pRebalance != NULL) {
+ deleteAndZero(m_pRebalance);
+ }
+ if (m_pOffsetStore != NULL) {
+ deleteAndZero(m_pOffsetStore);
+ }
+ if (m_pPullAPIWrapper != NULL) {
+ deleteAndZero(m_pPullAPIWrapper);
+ }
+ if (m_consumerService != NULL) {
+ deleteAndZero(m_consumerService);
+ }
+ PullMAP::iterator it = m_PullCallback.begin();
+ for (; it != m_PullCallback.end(); ++it) {
+ deleteAndZero(it->second);
+ }
+ m_PullCallback.clear();
+ m_subTopics.clear();
+ }
+
+ void DefaultMQPushConsumer::sendMessageBack(MQMessageExt &msg, int delayLevel) {
+ try {
+ getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(
+ msg, getGroupName(), delayLevel, 3000, getSessionCredentials());
+ } catch (MQException &e) {
+ LOG_ERROR(e.what());
+ }
+ }
+
+ void DefaultMQPushConsumer::fetchSubscribeMessageQueues(
+ const string &topic, vector<MQMessageQueue> &mqs) {
+ mqs.clear();
+ try {
+ getFactory()->fetchSubscribeMessageQueues(topic, mqs,
+ getSessionCredentials());
+ } catch (MQException &e) {
+ LOG_ERROR(e.what());
+ }
+ }
+
+ void DefaultMQPushConsumer::doRebalance() {
+ if (isServiceStateOk()) {
+ try {
+ m_pRebalance->doRebalance();
+ } catch (MQException &e) {
+ LOG_ERROR(e.what());
+ }
+ }
+ }
+
+ void DefaultMQPushConsumer::persistConsumerOffset() {
+ if (isServiceStateOk()) {
+ m_pRebalance->persistConsumerOffset();
+ }
+ }
+
+ void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
+ if (isServiceStateOk()) {
+ m_pRebalance->persistConsumerOffsetByResetOffset();
+ }
+ }
+
+ void DefaultMQPushConsumer::start() {
+#ifndef WIN32
+ /* Ignore the SIGPIPE */
+ struct sigaction sa;
+ memset(&sa,0, sizeof(struct sigaction));
+ sa.sa_handler = SIG_IGN;
+ sa.sa_flags = 0;
+ sigaction(SIGPIPE, &sa, 0);
+#endif
+ switch (m_serviceState) {
+ case CREATE_JUST: {
+ m_serviceState = START_FAILED;
+ MQClient::start();
+ LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
+
+ //<!data;
+ checkConfig();
+
+ //<!create rebalance;
+ m_pRebalance = new RebalancePush(this, getFactory());
+
+ string groupname = getGroupName();
+ m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
+
+ if (m_pMessageListener) {
+ if (m_pMessageListener->getMessageListenerType() ==
+ messageListenerOrderly) {
+ LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
+ m_consumerService = new ConsumeMessageOrderlyService(
+ this, m_consumeThreadCount, m_pMessageListener);
+ } else // for backward compatible, defaultly and concurrently listeners
+ // are allocating ConsumeMessageConcurrentlyService
+ {
+ LOG_INFO("start concurrently consume service:%s",
+ getGroupName().c_str());
+ m_consumerService = new ConsumeMessageConcurrentlyService(
+ this, m_consumeThreadCount, m_pMessageListener);
+ }
+ }
+
+ m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
+ m_pullmsgThread.reset(new boost::thread(boost::bind(
+ &DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
+
+ copySubscription();
+
+ //<! registe;
+ bool registerOK = getFactory()->registerConsumer(this);
+ if (!registerOK) {
+ m_serviceState = CREATE_JUST;
+ THROW_MQEXCEPTION(
+ MQClientException,
+ "The cousumer group[" + getGroupName() +
+ "] has been created before, specify another name please.",
+ -1);
+ }
+
+ //<!msg model;
+ switch (getMessageModel()) {
+ case BROADCASTING:
+ m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
+ break;
+ case CLUSTERING:
+ m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
+ break;
+ }
+ bool bStartFailed = false;
+ string errorMsg;
+ try {
+ m_pOffsetStore->load();
+ } catch (MQClientException &e) {
+ bStartFailed = true;
+ errorMsg = std::string(e.what());
+ }
+ m_consumerService->start();
+
+ getFactory()->start();
+
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ getFactory()->sendHeartbeatToAllBroker();
+
+ m_serviceState = RUNNING;
+ if (bStartFailed) {
+ shutdown();
+ THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
+ }
+ break;
+ }
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+
+ getFactory()->rebalanceImmediately();
+ }
+
+ void DefaultMQPushConsumer::shutdown() {
+ switch (m_serviceState) {
+ case RUNNING: {
+ LOG_INFO("DefaultMQPushConsumer shutdown");
+ m_async_ioService.stop();
+ m_async_service_thread->interrupt();
+ m_async_service_thread->join();
+ m_pullmsgQueue->close();
+ m_pullmsgThread->interrupt();
+ m_pullmsgThread->join();
+ m_consumerService->shutdown();
+ persistConsumerOffset();
+ shutdownAsyncPullCallBack(); // delete aync pullMsg resources
+ getFactory()->unregisterConsumer(this);
+ getFactory()->shutdown();
+ m_serviceState = SHUTDOWN_ALREADY;
+ break;
+ }
+ case CREATE_JUST:
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+ }
+
+ void DefaultMQPushConsumer::registerMessageListener(
+ MQMessageListener *pMessageListener) {
+ if (NULL != pMessageListener) {
+ m_pMessageListener = pMessageListener;
+ }
+ }
+
+ MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
+ if (NULL != m_pMessageListener) {
+ return m_pMessageListener->getMessageListenerType();
+ }
+ return messageListenerDefaultly;
+ }
+
+ ConsumeMsgService *DefaultMQPushConsumer::getConsumerMsgService() const {
+ return m_consumerService;
+ }
+
+ OffsetStore *DefaultMQPushConsumer::getOffsetStore() const {
+ return m_pOffsetStore;
+ }
+
+ Rebalance *DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; }
+
+ void DefaultMQPushConsumer::subscribe(const string &topic,
+ const string &subExpression) {
+ m_subTopics[topic] = subExpression;
+ }
+
+ void DefaultMQPushConsumer::checkConfig() {
+ string groupname = getGroupName();
+ // check consumerGroup
+ Validators::checkGroup(groupname);
+
+ // consumerGroup
+ if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
+ THROW_MQEXCEPTION(MQClientException,
+ "consumerGroup can not equal DEFAULT_CONSUMER", -1);
+ }
+
+ if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
+ THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
+ }
+
+ if (m_pMessageListener == NULL) {
+ THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
+ }
+ }
+
+ void DefaultMQPushConsumer::copySubscription() {
+ map<string, string>::iterator it = m_subTopics.begin();
+ for (; it != m_subTopics.end(); ++it) {
+ LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(),
+ it->second.c_str());
+ unique_ptr<SubscriptionData> pSData(
+ FilterAPI::buildSubscriptionData(it->first, it->second));
+
+ m_pRebalance->setSubscriptionData(it->first, pSData.release());
+ }
+
+ switch (getMessageModel()) {
+ case BROADCASTING:
+ break;
+ case CLUSTERING: {
+ string retryTopic = UtilAll::getRetryTopic(getGroupName());
+
+ //<!this sub;
+ unique_ptr<SubscriptionData> pSData(
+ FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
+
+ m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ void DefaultMQPushConsumer::updateTopicSubscribeInfo(
+ const string &topic, vector<MQMessageQueue> &info) {
+ m_pRebalance->setTopicSubscribeInfo(topic, info);
+ }
+
+ void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ map<string, SubscriptionData *> &subTable =
+ m_pRebalance->getSubscriptionInner();
+ map<string, SubscriptionData *>::iterator it = subTable.begin();
+ for (; it != subTable.end(); ++it) {
+ bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(
+ it->first, getSessionCredentials());
+ if (btopic == false) {
+ LOG_WARN("The topic:[%s] not exist", it->first.c_str());
+ }
+ }
+ }
+
+ ConsumeType DefaultMQPushConsumer::getConsumeType() {
+ return CONSUME_PASSIVELY;
+ }
+
+ ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
+ return m_consumeFromWhere;
+ }
+
+ void DefaultMQPushConsumer::setConsumeFromWhere(
+ ConsumeFromWhere consumeFromWhere) {
+ m_consumeFromWhere = consumeFromWhere;
+ }
+
+ void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData> &result) {
+ map<string, SubscriptionData *> &subTable =
+ m_pRebalance->getSubscriptionInner();
+ map<string, SubscriptionData *>::iterator it = subTable.begin();
+ for (; it != subTable.end(); ++it) {
+ result.push_back(*(it->second));
+ }
+ }
+
+ void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue &mq,
+ int64 offset) {
+ if (offset >= 0) {
+ m_pOffsetStore->updateOffset(mq, offset);
+ } else {
+ LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
+ }
+ }
+
+ void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue &mq) {
+ m_pOffsetStore->removeOffset(mq);
+ }
+
+ void DefaultMQPushConsumer::triggerNextPullRequest(
+ boost::asio::deadline_timer *t, PullRequest *request) {
+ // LOG_INFO("trigger pullrequest for:%s",
+ // (request->m_messageQueue).toString().c_str());
+ producePullMsgTask(request);
+ deleteAndZero(t);
+ }
+
+ void DefaultMQPushConsumer::producePullMsgTask(PullRequest *request) {
+ if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+ request->addPullMsgEvent();
+ if (m_asyncPull) {
+ m_pullmsgQueue->produce(TaskBinder::gen(
+ &DefaultMQPushConsumer::pullMessageAsync, this, request));
+ } else {
+ m_pullmsgQueue->produce(
+ TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
+ }
+ } else {
+ LOG_WARN("produce pullmsg of mq:%s failed",
+ request->m_messageQueue.toString().c_str());
+ }
+ }
+
+ void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue *pTaskQueue) {
+ pTaskQueue->run();
+ }
+
+ void DefaultMQPushConsumer::pullMessage(PullRequest *request) {
+ if (request == NULL) {
+ LOG_ERROR("Pull request is NULL, return");
+ return;
+ }
+ if (request->isDroped()) {
+ LOG_WARN("Pull request is set drop with mq:%s, return",
+ (request->m_messageQueue).toString().c_str());
+ request->removePullMsgEvent();
+ return;
+ }
+
+ MQMessageQueue &messageQueue = request->m_messageQueue;
+ if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+ messageListenerOrderly) {
+ if (!request->isLocked() || request->isLockExpired()) {
+ if (!m_pRebalance->lock(messageQueue)) {
+ producePullMsgTask(request);
+ return;
+ }
+ }
+ }
+
+ if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+ // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+ // than:%d", (request->m_messageQueue).toString().c_str(),
+ // request->getCacheMsgCount(), m_maxMsgCacheSize);
+ boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+ m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+ t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+ this, t, request));
+ return;
+ }
+
+ bool commitOffsetEnable = false;
+ int64 commitOffsetValue = 0;
+ if (CLUSTERING == getMessageModel()) {
+ commitOffsetValue = m_pOffsetStore->readOffset(
+ messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+ if (commitOffsetValue > 0) {
+ commitOffsetEnable = true;
+ }
+ }
+
+ string subExpression;
+ SubscriptionData *pSdata =
+ m_pRebalance->getSubscriptionData(messageQueue.getTopic());
+ if (pSdata == NULL) {
+ producePullMsgTask(request);
+ return;
+ }
+ subExpression = pSdata->getSubString();
+
+ int sysFlag =
+ PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
+ false, // suspend
+ !subExpression.empty(), // subscription
+ false); // class filter
+
+ try {
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ unique_ptr<PullResult> result(
+ m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
+ subExpression, // 2
+ pSdata->getSubVersion(), // 3
+ request->getNextOffset(), // 4
+ 32, // 5
+ sysFlag, // 6
+ commitOffsetValue, // 7
+ 1000 * 15, // 8
+ 1000 * 30, // 9
+ ComMode_SYNC, // 10
+ NULL, getSessionCredentials()));
+
+ PullResult pullResult = m_pPullAPIWrapper->processPullResult(
+ messageQueue, result.get(), pSdata);
+
+ switch (pullResult.pullStatus) {
+ case FOUND: {
+ if (!request->isDroped()) // if request is setted to dropped, don't add
+ // msgFoundList to m_msgTreeMap and don't
+ // call producePullMsgTask
+ { // avoid issue: pullMsg is sent out, rebalance is doing concurrently
+ // and this request is dropped, and then received pulled msgs.
+ request->setNextOffset(pullResult.nextBeginOffset);
+ request->putMessage(pullResult.msgFoundList);
+
+ m_consumerService->submitConsumeRequest(request,
+ pullResult.msgFoundList);
+ producePullMsgTask(request);
+
+ LOG_DEBUG("FOUND:%s with size:"
+ SIZET_FMT
+ ",nextBeginOffset:%lld",
+ messageQueue.toString().c_str(),
+ pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+ } else {
+ request->removePullMsgEvent();
+ }
+ break;
+ }
+ case NO_NEW_MSG: {
+ request->setNextOffset(pullResult.nextBeginOffset);
+ vector<MQMessageExt> msgs;
+ request->getMessage(msgs);
+ if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
+ is kept, then consumer will enter following situation:
+ 1>. get pull offset with 0 when do rebalance, and set
+ m_offsetTable[mq] to 0;
+ 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
+ offset increase by 800
+ 3>. request->getMessage(msgs) always NULL
+ 4>. we need update consumerOffset to nextBeginOffset indicated by
+ broker
+ but if really no new msg could be pulled, also go to this CASE
+ */
+ // LOG_DEBUG("maybe misMatch between broker and client happens, update
+ // consumerOffset to nextBeginOffset indicated by broker");
+ updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+ }
+ producePullMsgTask(request);
+ LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
+ messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+ break;
+ }
+ case NO_MATCHED_MSG: {
+ request->setNextOffset(pullResult.nextBeginOffset);
+ vector<MQMessageExt> msgs;
+ request->getMessage(msgs);
+ if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ // LOG_DEBUG("maybe misMatch between broker and client happens, update
+ // consumerOffset to nextBeginOffset indicated by broker");
+ updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
+ }
+ producePullMsgTask(request);
+
+ LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
+ messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+ break;
+ }
+ case OFFSET_ILLEGAL: {
+ request->setNextOffset(pullResult.nextBeginOffset);
+ producePullMsgTask(request);
+
+ LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
+ messageQueue.toString().c_str(), pullResult.nextBeginOffset);
+ break;
+ }
+ case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
+ // will not returns this status, so this case
+ // could not be entered.
+ LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
+ request->setNextOffset(pullResult.nextBeginOffset);
+ producePullMsgTask(request);
+ break;
+ }
+ }
+ } catch (MQException &e) {
+ LOG_ERROR(e.what());
+ producePullMsgTask(request);
+ }
+ }
+
+ AsyncPullCallback *DefaultMQPushConsumer::getAsyncPullCallBack(
+ PullRequest *request, MQMessageQueue msgQueue) {
+ boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+ if (m_asyncPull && request) {
+ PullMAP::iterator it = m_PullCallback.find(msgQueue);
+ if (it == m_PullCallback.end()) {
+ LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
+ m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
+ }
+ return m_PullCallback[msgQueue];
+ }
+
+ return NULL;
+ }
+
+ void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
+ boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+ if (m_asyncPull) {
+ PullMAP::iterator it = m_PullCallback.begin();
+ for (; it != m_PullCallback.end(); ++it) {
+ if (it->second) {
+ it->second->setShutdownStatus();
+ } else {
+ LOG_ERROR("could not find asyncPullCallback for:%s",
+ it->first.toString().c_str());
+ }
+ }
+ }
+ }
+
+ void DefaultMQPushConsumer::pullMessageAsync(PullRequest *request) {
+ if (request == NULL) {
+ LOG_ERROR("Pull request is NULL, return");
+ return;
+ }
+ if (request->isDroped()) {
+ LOG_WARN("Pull request is set drop with mq:%s, return",
+ (request->m_messageQueue).toString().c_str());
+ request->removePullMsgEvent();
+ return;
+ }
+
+ MQMessageQueue &messageQueue = request->m_messageQueue;
+ if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+ messageListenerOrderly) {
+ if (!request->isLocked() || request->isLockExpired()) {
+ if (!m_pRebalance->lock(messageQueue)) {
+ producePullMsgTask(request);
+ return;
+ }
+ }
+ }
+
+ if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
+ // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
+ // than:%d", (request->m_messageQueue).toString().c_str(),
+ // request->getCacheMsgCount(), m_maxMsgCacheSize);
+ boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+ m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
+ t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
+ this, t, request));
+ return;
+ }
+
+ bool commitOffsetEnable = false;
+ int64 commitOffsetValue = 0;
+ if (CLUSTERING == getMessageModel()) {
+ commitOffsetValue = m_pOffsetStore->readOffset(
+ messageQueue, READ_FROM_MEMORY, getSessionCredentials());
+ if (commitOffsetValue > 0) {
+ commitOffsetEnable = true;
+ }
+ }
+
+ string subExpression;
+ SubscriptionData *pSdata =
+ (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
+ if (pSdata == NULL) {
+ producePullMsgTask(request);
+ return;
+ }
+ subExpression = pSdata->getSubString();
+
+ int sysFlag =
+ PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
+ true, // suspend
+ !subExpression.empty(), // subscription
+ false); // class filter
+
+ AsyncArg arg;
+ arg.mq = messageQueue;
+ arg.subData = *pSdata;
+ arg.pPullWrapper = m_pPullAPIWrapper;
+
+ try {
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ m_pPullAPIWrapper->pullKernelImpl(
+ messageQueue, // 1
+ subExpression, // 2
+ pSdata->getSubVersion(), // 3
+ request->getNextOffset(), // 4
+ 32, // 5
+ sysFlag, // 6
+ commitOffsetValue, // 7
+ 1000 * 15, // 8
+ m_asyncPullTimeout, // 9
+ ComMode_ASYNC, // 10
+ getAsyncPullCallBack(request, messageQueue), // 11
+ getSessionCredentials(), // 12
+ &arg); // 13
+ } catch (MQException &e) {
+ LOG_ERROR(e.what());
+ producePullMsgTask(request);
+ }
+ }
+
+ void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
+ if (asyncFlag) {
+ LOG_INFO("set pushConsumer:%s to async default pull mode",
+ getGroupName().c_str());
+ } else {
+ LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
+ }
+ m_asyncPull = asyncFlag;
+ }
+
+ void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
+ if (threadCount > 0) {
+ m_consumeThreadCount = threadCount;
+ } else {
+ LOG_ERROR("setConsumeThreadCount with invalid value");
+ }
+ }
+
+ int DefaultMQPushConsumer::getConsumeThreadCount() const {
+ return m_consumeThreadCount;
+ }
+
+ void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
+ m_pullMsgThreadPoolNum = threadCount;
+ }
+
+ int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
+ return m_pullMsgThreadPoolNum;
+ }
+
+ int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
+ return m_consumeMessageBatchMaxSize;
+ }
+
+ void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(
+ int consumeMessageBatchMaxSize) {
+ if (consumeMessageBatchMaxSize >= 1)
+ m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+ }
+
+ void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
+ if (maxCacheSize > 0 && maxCacheSize < 65535) {
+ LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize,
+ getGroupName().c_str());
+ m_maxMsgCacheSize = maxCacheSize;
+ }
+ }
+
+ int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
+ return m_maxMsgCacheSize;
+ }
+
+ ConsumerRunningInfo *DefaultMQPushConsumer::getConsumerRunningInfo() {
+ ConsumerRunningInfo *info = new ConsumerRunningInfo();
+ if (info) {
+ if (m_consumerService->getConsumeMsgSerivceListenerType() ==
+ messageListenerOrderly)
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+ else
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
+ info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
+ UtilAll::to_string(m_consumeThreadCount));
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP,
+ UtilAll::to_string(m_startTime));
+
+ vector<SubscriptionData> result;
+ getSubscriptions(result);
+ info->setSubscriptionSet(result);
+
+ map<MQMessageQueue, PullRequest *> requestTable =
+ m_pRebalance->getPullRequestTable();
+ map<MQMessageQueue, PullRequest *>::iterator it = requestTable.begin();
+
+ for (; it != requestTable.end(); ++it) {
+ if (!it->second->isDroped()) {
+ map<MessageQueue, ProcessQueueInfo> queueTable;
+ MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(),
+ (it->first).getQueueId());
+ ProcessQueueInfo processQueue;
+ processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
+ processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
+ processQueue.cachedMsgCount = it->second->getCacheMsgCount();
+ processQueue.setCommitOffset(m_pOffsetStore->readOffset(
+ it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+ processQueue.setDroped(it->second->isDroped());
+ processQueue.setLocked(it->second->isLocked());
+ processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
+ processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
+ processQueue.lastConsumeTimestamp =
+ it->second->getLastConsumeTimestamp();
+ info->setMqTable(queue, processQueue);
+ }
+ }
+
+ return info;
+ }
+ return NULL;
+ }
+
+//<!************************************************************************
+} //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index c62bd7d..f7abfaf 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -85,7 +85,7 @@
map<int64, MQMessageExt> m_msgTreeMapTemp;
boost::mutex m_pullRequestLock;
uint64 m_lastLockTimestamp; // ms
- uint64 m_tryUnlockTimes;
+ //uint64 m_tryUnlockTimes;
uint64 m_lastPullTimestamp;
uint64 m_lastConsumeTimestamp;
boost::timed_mutex m_consumeLock;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 1411ebd..f0b53cf 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -37,7 +37,7 @@
: m_sendMsgTimeout(3000),
m_compressMsgBodyOverHowmuch(4 * 1024),
m_maxMessageSize(1024 * 128),
- m_retryAnotherBrokerWhenNotStoreOK(false),
+ //m_retryAnotherBrokerWhenNotStoreOK(false),
m_compressLevel(5),
m_retryTimes(5) {
//<!set default group name;
@@ -50,7 +50,8 @@
void DefaultMQProducer::start() {
#ifndef WIN32
/* Ignore the SIGPIPE */
- struct sigaction sa = {0};
+ struct sigaction sa;
+ memset(&sa,0, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
diff --git a/src/thread/disruptor/sequence.h b/src/thread/disruptor/sequence.h
index b3e5876..4fb61a4 100755
--- a/src/thread/disruptor/sequence.h
+++ b/src/thread/disruptor/sequence.h
@@ -102,9 +102,9 @@
PaddedSequence(int64_t initial_value = kInitialCursorValue) :
Sequence(initial_value) {}
- private:
+ //private:
// padding
- int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
+ //int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
};
@@ -133,8 +133,8 @@
public:
PaddedLong(int64_t initial_value = kInitialCursorValue) :
MutableLong(initial_value) {}
- private:
- int64_t padding_[SEQUENCE_PADDING_LENGTH];
+ //private:
+ //int64_t padding_[SEQUENCE_PADDING_LENGTH];
};
int64_t GetMinimumSequence(
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 62cc19e..011c420 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -30,12 +30,12 @@
TcpTransport::TcpTransport(TcpRemotingClient *pTcpRemointClient,
READ_CALLBACK handle /* = NULL */)
: m_tcpConnectStatus(e_connectInit),
- m_ReadDatathread(NULL),
- m_readcallback(handle),
- m_tcpRemotingClient(pTcpRemointClient),
m_event_base_status(false),
m_event_base_mtx(),
- m_event_base_cv() {
+ m_event_base_cv(),
+ m_ReadDatathread(NULL),
+ m_readcallback(handle),
+ m_tcpRemotingClient(pTcpRemointClient){
m_startTime = UtilAll::currentTimeMillis();
#ifdef WIN32
evthread_use_windows_threads();