/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
#include "DefaultMQPushConsumer.h" | |
#include "CommunicationMode.h" | |
#include "ConsumeMsgService.h" | |
#include "ConsumerRunningInfo.h" | |
#include "FilterAPI.h" | |
#include "Logging.h" | |
#include "MQClientAPIImpl.h" | |
#include "MQClientFactory.h" | |
#include "MQClientManager.h" | |
#include "MQProtos.h" | |
#include "OffsetStore.h" | |
#include "PullAPIWrapper.h" | |
#include "PullSysFlag.h" | |
#include "Rebalance.h" | |
#include "UtilAll.h" | |
#include "Validators.h" | |
#include "task_queue.h" | |
namespace rocketmq { | |
class AsyncPullCallback : public PullCallback { | |
public: | |
AsyncPullCallback(DefaultMQPushConsumer *pushConsumer, PullRequest *request) | |
: m_callbackOwner(pushConsumer), | |
m_pullRequest(request), | |
m_bShutdown(false) {} | |
virtual ~AsyncPullCallback() { | |
m_callbackOwner = NULL; | |
m_pullRequest = NULL; | |
} | |
virtual void onSuccess(MQMessageQueue &mq, PullResult &result, | |
bool bProducePullRequest) { | |
if (m_bShutdown == true) { | |
LOG_INFO("pullrequest for:%s in shutdown, return", | |
(m_pullRequest->m_messageQueue).toString().c_str()); | |
m_pullRequest->removePullMsgEvent(); | |
return; | |
} | |
switch (result.pullStatus) { | |
case FOUND: { | |
if (!m_pullRequest->isDroped()) // if request is setted to dropped, | |
// don't add msgFoundList to | |
// m_msgTreeMap and don't call | |
// producePullMsgTask | |
{ // avoid issue: pullMsg is sent out, rebalance is doing concurrently | |
// and this request is dropped, and then received pulled msgs. | |
m_pullRequest->setNextOffset(result.nextBeginOffset); | |
m_pullRequest->putMessage(result.msgFoundList); | |
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest( | |
m_pullRequest, result.msgFoundList); | |
if (bProducePullRequest) | |
m_callbackOwner->producePullMsgTask(m_pullRequest); | |
else | |
m_pullRequest->removePullMsgEvent(); | |
LOG_DEBUG("FOUND:%s with size:" | |
SIZET_FMT | |
", nextBeginOffset:%lld", | |
(m_pullRequest->m_messageQueue).toString().c_str(), | |
result.msgFoundList.size(), result.nextBeginOffset); | |
} else { | |
LOG_INFO("remove pullmsg event of mq:%s", | |
(m_pullRequest->m_messageQueue).toString().c_str()); | |
m_pullRequest->removePullMsgEvent(); | |
} | |
break; | |
} | |
case NO_NEW_MSG: { | |
m_pullRequest->setNextOffset(result.nextBeginOffset); | |
vector<MQMessageExt> msgs; | |
m_pullRequest->getMessage(msgs); | |
if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) { | |
/*if broker losted/cleared msgs of one msgQueue, but the brokerOffset | |
is kept, then consumer will enter following situation: | |
1>. get pull offset with 0 when do rebalance, and set | |
m_offsetTable[mq] to 0; | |
2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin | |
offset increase by 800 | |
3>. request->getMessage(msgs) always NULL | |
4>. we need update consumerOffset to nextBeginOffset indicated by | |
broker | |
but if really no new msg could be pulled, also go to this CASE | |
LOG_INFO("maybe misMatch between broker and client happens, update | |
consumerOffset to nextBeginOffset indicated by broker");*/ | |
m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, | |
result.nextBeginOffset); | |
} | |
if (bProducePullRequest) | |
m_callbackOwner->producePullMsgTask(m_pullRequest); | |
else | |
m_pullRequest->removePullMsgEvent(); | |
/*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld", | |
(m_pullRequest->m_messageQueue).toString().c_str(), | |
result.nextBeginOffset);*/ | |
break; | |
} | |
case NO_MATCHED_MSG: { | |
m_pullRequest->setNextOffset(result.nextBeginOffset); | |
vector<MQMessageExt> msgs; | |
m_pullRequest->getMessage(msgs); | |
if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) { | |
/*if broker losted/cleared msgs of one msgQueue, but the brokerOffset | |
is kept, then consumer will enter following situation: | |
1>. get pull offset with 0 when do rebalance, and set | |
m_offsetTable[mq] to 0; | |
2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin | |
offset increase by 800 | |
3>. request->getMessage(msgs) always NULL | |
4>. we need update consumerOffset to nextBeginOffset indicated by | |
broker | |
but if really no new msg could be pulled, also go to this CASE | |
LOG_INFO("maybe misMatch between broker and client happens, update | |
consumerOffset to nextBeginOffset indicated by broker");*/ | |
m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, | |
result.nextBeginOffset); | |
} | |
if (bProducePullRequest) | |
m_callbackOwner->producePullMsgTask(m_pullRequest); | |
else | |
m_pullRequest->removePullMsgEvent(); | |
/*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", | |
(m_pullRequest->m_messageQueue).toString().c_str(), | |
result.nextBeginOffset);*/ | |
break; | |
} | |
case OFFSET_ILLEGAL: { | |
m_pullRequest->setNextOffset(result.nextBeginOffset); | |
if (bProducePullRequest) | |
m_callbackOwner->producePullMsgTask(m_pullRequest); | |
else | |
m_pullRequest->removePullMsgEvent(); | |
/*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", | |
(m_pullRequest->m_messageQueue).toString().c_str(), | |
result.nextBeginOffset);*/ | |
break; | |
} | |
case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker | |
// will not returns this status, so this case | |
// could not be entered. | |
LOG_ERROR("impossible BROKER_TIMEOUT Occurs"); | |
m_pullRequest->setNextOffset(result.nextBeginOffset); | |
if (bProducePullRequest) | |
m_callbackOwner->producePullMsgTask(m_pullRequest); | |
else | |
m_pullRequest->removePullMsgEvent(); | |
break; | |
} | |
} | |
} | |
virtual void onException(MQException &e) { | |
if (m_bShutdown == true) { | |
LOG_INFO("pullrequest for:%s in shutdown, return", | |
(m_pullRequest->m_messageQueue).toString().c_str()); | |
m_pullRequest->removePullMsgEvent(); | |
return; | |
} | |
LOG_WARN("pullrequest for:%s occurs exception, reproduce it", | |
(m_pullRequest->m_messageQueue).toString().c_str()); | |
m_callbackOwner->producePullMsgTask(m_pullRequest); | |
} | |
void setShutdownStatus() { m_bShutdown = true; } | |
private: | |
DefaultMQPushConsumer *m_callbackOwner; | |
PullRequest *m_pullRequest; | |
bool m_bShutdown; | |
}; | |
//<!*************************************************************************** | |
static boost::mutex m_asyncCallbackLock; | |
DefaultMQPushConsumer::DefaultMQPushConsumer(const string &groupname) | |
: m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET), | |
m_pOffsetStore(NULL), | |
m_pPullAPIWrapper(NULL), | |
m_pMessageListener(NULL), | |
m_consumeMessageBatchMaxSize(1), | |
m_maxMsgCacheSize(1000) { | |
//<!set default group name; | |
string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname; | |
setGroupName(gname); | |
m_asyncPull = true; | |
m_asyncPullTimeout = 30 * 1000; | |
setMessageModel(CLUSTERING); | |
m_startTime = UtilAll::currentTimeMillis(); | |
m_consumeThreadCount = boost::thread::hardware_concurrency(); | |
m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency(); | |
m_async_service_thread.reset(new boost::thread( | |
boost::bind(&DefaultMQPushConsumer::boost_asio_work, this))); | |
} | |
void DefaultMQPushConsumer::boost_asio_work() { | |
LOG_INFO("DefaultMQPushConsumer::boost asio async service runing"); | |
boost::asio::io_service::work work(m_async_ioService); // avoid async io | |
// service stops after | |
// first timer timeout | |
// callback | |
m_async_ioService.run(); | |
} | |
DefaultMQPushConsumer::~DefaultMQPushConsumer() { | |
m_pMessageListener = NULL; | |
if (m_pullmsgQueue != NULL) { | |
deleteAndZero(m_pullmsgQueue); | |
} | |
if (m_pRebalance != NULL) { | |
deleteAndZero(m_pRebalance); | |
} | |
if (m_pOffsetStore != NULL) { | |
deleteAndZero(m_pOffsetStore); | |
} | |
if (m_pPullAPIWrapper != NULL) { | |
deleteAndZero(m_pPullAPIWrapper); | |
} | |
if (m_consumerService != NULL) { | |
deleteAndZero(m_consumerService); | |
} | |
PullMAP::iterator it = m_PullCallback.begin(); | |
for (; it != m_PullCallback.end(); ++it) { | |
deleteAndZero(it->second); | |
} | |
m_PullCallback.clear(); | |
m_subTopics.clear(); | |
} | |
void DefaultMQPushConsumer::sendMessageBack(MQMessageExt &msg, int delayLevel) { | |
try { | |
getFactory()->getMQClientAPIImpl()->consumerSendMessageBack( | |
msg, getGroupName(), delayLevel, 3000, getSessionCredentials()); | |
} catch (MQException &e) { | |
LOG_ERROR(e.what()); | |
} | |
} | |
void DefaultMQPushConsumer::fetchSubscribeMessageQueues( | |
const string &topic, vector<MQMessageQueue> &mqs) { | |
mqs.clear(); | |
try { | |
getFactory()->fetchSubscribeMessageQueues(topic, mqs, | |
getSessionCredentials()); | |
} catch (MQException &e) { | |
LOG_ERROR(e.what()); | |
} | |
} | |
void DefaultMQPushConsumer::doRebalance() { | |
if (isServiceStateOk()) { | |
try { | |
m_pRebalance->doRebalance(); | |
} catch (MQException &e) { | |
LOG_ERROR(e.what()); | |
} | |
} | |
} | |
void DefaultMQPushConsumer::persistConsumerOffset() { | |
if (isServiceStateOk()) { | |
m_pRebalance->persistConsumerOffset(); | |
} | |
} | |
void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() { | |
if (isServiceStateOk()) { | |
m_pRebalance->persistConsumerOffsetByResetOffset(); | |
} | |
} | |
void DefaultMQPushConsumer::start() { | |
#ifndef WIN32 | |
/* Ignore the SIGPIPE */ | |
struct sigaction sa = {0}; | |
sa.sa_handler = SIG_IGN; | |
sa.sa_flags = 0; | |
sigaction(SIGPIPE, &sa, 0); | |
#endif | |
switch (m_serviceState) { | |
case CREATE_JUST: { | |
m_serviceState = START_FAILED; | |
MQClient::start(); | |
LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str()); | |
//<!data; | |
checkConfig(); | |
//<!create rebalance; | |
m_pRebalance = new RebalancePush(this, getFactory()); | |
string groupname = getGroupName(); | |
m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname); | |
if (m_pMessageListener) { | |
if (m_pMessageListener->getMessageListenerType() == | |
messageListenerOrderly) { | |
LOG_INFO("start orderly consume service:%s", getGroupName().c_str()); | |
m_consumerService = new ConsumeMessageOrderlyService( | |
this, m_consumeThreadCount, m_pMessageListener); | |
} else // for backward compatible, defaultly and concurrently listeners | |
// are allocating ConsumeMessageConcurrentlyService | |
{ | |
LOG_INFO("start concurrently consume service:%s", | |
getGroupName().c_str()); | |
m_consumerService = new ConsumeMessageConcurrentlyService( | |
this, m_consumeThreadCount, m_pMessageListener); | |
} | |
} | |
m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum); | |
m_pullmsgThread.reset(new boost::thread(boost::bind( | |
&DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue))); | |
copySubscription(); | |
//<! registe; | |
bool registerOK = getFactory()->registerConsumer(this); | |
if (!registerOK) { | |
m_serviceState = CREATE_JUST; | |
THROW_MQEXCEPTION( | |
MQClientException, | |
"The cousumer group[" + getGroupName() + | |
"] has been created before, specify another name please.", | |
-1); | |
} | |
//<!msg model; | |
switch (getMessageModel()) { | |
case BROADCASTING: | |
m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory()); | |
break; | |
case CLUSTERING: | |
m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory()); | |
break; | |
} | |
bool bStartFailed = false; | |
string errorMsg; | |
try { | |
m_pOffsetStore->load(); | |
} catch (MQClientException &e) { | |
bStartFailed = true; | |
errorMsg = std::string(e.what()); | |
} | |
m_consumerService->start(); | |
getFactory()->start(); | |
updateTopicSubscribeInfoWhenSubscriptionChanged(); | |
getFactory()->sendHeartbeatToAllBroker(); | |
m_serviceState = RUNNING; | |
if (bStartFailed) { | |
shutdown(); | |
THROW_MQEXCEPTION(MQClientException, errorMsg, -1); | |
} | |
break; | |
} | |
case RUNNING: | |
case START_FAILED: | |
case SHUTDOWN_ALREADY: | |
break; | |
default: | |
break; | |
} | |
getFactory()->rebalanceImmediately(); | |
} | |
void DefaultMQPushConsumer::shutdown() { | |
switch (m_serviceState) { | |
case RUNNING: { | |
LOG_INFO("DefaultMQPushConsumer shutdown"); | |
m_async_ioService.stop(); | |
m_async_service_thread->interrupt(); | |
m_async_service_thread->join(); | |
m_pullmsgQueue->close(); | |
m_pullmsgThread->interrupt(); | |
m_pullmsgThread->join(); | |
m_consumerService->shutdown(); | |
persistConsumerOffset(); | |
shutdownAsyncPullCallBack(); // delete aync pullMsg resources | |
getFactory()->unregisterConsumer(this); | |
getFactory()->shutdown(); | |
m_serviceState = SHUTDOWN_ALREADY; | |
break; | |
} | |
case CREATE_JUST: | |
case SHUTDOWN_ALREADY: | |
break; | |
default: | |
break; | |
} | |
} | |
void DefaultMQPushConsumer::registerMessageListener( | |
MQMessageListener *pMessageListener) { | |
if (NULL != pMessageListener) { | |
m_pMessageListener = pMessageListener; | |
} | |
} | |
MessageListenerType DefaultMQPushConsumer::getMessageListenerType() { | |
if (NULL != m_pMessageListener) { | |
return m_pMessageListener->getMessageListenerType(); | |
} | |
return messageListenerDefaultly; | |
} | |
ConsumeMsgService *DefaultMQPushConsumer::getConsumerMsgService() const { | |
return m_consumerService; | |
} | |
OffsetStore *DefaultMQPushConsumer::getOffsetStore() const { | |
return m_pOffsetStore; | |
} | |
Rebalance *DefaultMQPushConsumer::getRebalance() const { return m_pRebalance; } | |
void DefaultMQPushConsumer::subscribe(const string &topic, | |
const string &subExpression) { | |
m_subTopics[topic] = subExpression; | |
} | |
void DefaultMQPushConsumer::checkConfig() { | |
string groupname = getGroupName(); | |
// check consumerGroup | |
Validators::checkGroup(groupname); | |
// consumerGroup | |
if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) { | |
THROW_MQEXCEPTION(MQClientException, | |
"consumerGroup can not equal DEFAULT_CONSUMER", -1); | |
} | |
if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) { | |
THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1); | |
} | |
if (m_pMessageListener == NULL) { | |
THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1); | |
} | |
} | |
void DefaultMQPushConsumer::copySubscription() { | |
map<string, string>::iterator it = m_subTopics.begin(); | |
for (; it != m_subTopics.end(); ++it) { | |
LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(), | |
it->second.c_str()); | |
unique_ptr<SubscriptionData> pSData( | |
FilterAPI::buildSubscriptionData(it->first, it->second)); | |
m_pRebalance->setSubscriptionData(it->first, pSData.release()); | |
} | |
switch (getMessageModel()) { | |
case BROADCASTING: | |
break; | |
case CLUSTERING: { | |
string retryTopic = UtilAll::getRetryTopic(getGroupName()); | |
//<!this sub; | |
unique_ptr<SubscriptionData> pSData( | |
FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL)); | |
m_pRebalance->setSubscriptionData(retryTopic, pSData.release()); | |
break; | |
} | |
default: | |
break; | |
} | |
} | |
void DefaultMQPushConsumer::updateTopicSubscribeInfo( | |
const string &topic, vector<MQMessageQueue> &info) { | |
m_pRebalance->setTopicSubscribeInfo(topic, info); | |
} | |
void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() { | |
map<string, SubscriptionData *> &subTable = | |
m_pRebalance->getSubscriptionInner(); | |
map<string, SubscriptionData *>::iterator it = subTable.begin(); | |
for (; it != subTable.end(); ++it) { | |
bool btopic = getFactory()->updateTopicRouteInfoFromNameServer( | |
it->first, getSessionCredentials()); | |
if (btopic == false) { | |
LOG_WARN("The topic:[%s] not exist", it->first.c_str()); | |
} | |
} | |
} | |
ConsumeType DefaultMQPushConsumer::getConsumeType() { | |
return CONSUME_PASSIVELY; | |
} | |
ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() { | |
return m_consumeFromWhere; | |
} | |
void DefaultMQPushConsumer::setConsumeFromWhere( | |
ConsumeFromWhere consumeFromWhere) { | |
m_consumeFromWhere = consumeFromWhere; | |
} | |
void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData> &result) { | |
map<string, SubscriptionData *> &subTable = | |
m_pRebalance->getSubscriptionInner(); | |
map<string, SubscriptionData *>::iterator it = subTable.begin(); | |
for (; it != subTable.end(); ++it) { | |
result.push_back(*(it->second)); | |
} | |
} | |
void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue &mq, | |
int64 offset) { | |
if (offset >= 0) { | |
m_pOffsetStore->updateOffset(mq, offset); | |
} else { | |
LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str()); | |
} | |
} | |
void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue &mq) { | |
m_pOffsetStore->removeOffset(mq); | |
} | |
void DefaultMQPushConsumer::triggerNextPullRequest( | |
boost::asio::deadline_timer *t, PullRequest *request) { | |
// LOG_INFO("trigger pullrequest for:%s", | |
// (request->m_messageQueue).toString().c_str()); | |
producePullMsgTask(request); | |
deleteAndZero(t); | |
} | |
void DefaultMQPushConsumer::producePullMsgTask(PullRequest *request) { | |
if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) { | |
request->addPullMsgEvent(); | |
if (m_asyncPull) { | |
m_pullmsgQueue->produce(TaskBinder::gen( | |
&DefaultMQPushConsumer::pullMessageAsync, this, request)); | |
} else { | |
m_pullmsgQueue->produce( | |
TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request)); | |
} | |
} else { | |
LOG_WARN("produce pullmsg of mq:%s failed", | |
request->m_messageQueue.toString().c_str()); | |
} | |
} | |
void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue *pTaskQueue) { | |
pTaskQueue->run(); | |
} | |
void DefaultMQPushConsumer::pullMessage(PullRequest *request) { | |
if (request == NULL) { | |
LOG_ERROR("Pull request is NULL, return"); | |
return; | |
} | |
if (request->isDroped()) { | |
LOG_WARN("Pull request is set drop with mq:%s, return", | |
(request->m_messageQueue).toString().c_str()); | |
request->removePullMsgEvent(); | |
return; | |
} | |
MQMessageQueue &messageQueue = request->m_messageQueue; | |
if (m_consumerService->getConsumeMsgSerivceListenerType() == | |
messageListenerOrderly) { | |
if (!request->isLocked() || request->isLockExpired()) { | |
if (!m_pRebalance->lock(messageQueue)) { | |
producePullMsgTask(request); | |
return; | |
} | |
} | |
} | |
if (request->getCacheMsgCount() > m_maxMsgCacheSize) { | |
// LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger | |
// than:%d", (request->m_messageQueue).toString().c_str(), | |
// request->getCacheMsgCount(), m_maxMsgCacheSize); | |
boost::asio::deadline_timer *t = new boost::asio::deadline_timer( | |
m_async_ioService, boost::posix_time::milliseconds(1 * 1000)); | |
t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, | |
this, t, request)); | |
return; | |
} | |
bool commitOffsetEnable = false; | |
int64 commitOffsetValue = 0; | |
if (CLUSTERING == getMessageModel()) { | |
commitOffsetValue = m_pOffsetStore->readOffset( | |
messageQueue, READ_FROM_MEMORY, getSessionCredentials()); | |
if (commitOffsetValue > 0) { | |
commitOffsetEnable = true; | |
} | |
} | |
string subExpression; | |
SubscriptionData *pSdata = | |
m_pRebalance->getSubscriptionData(messageQueue.getTopic()); | |
if (pSdata == NULL) { | |
producePullMsgTask(request); | |
return; | |
} | |
subExpression = pSdata->getSubString(); | |
int sysFlag = | |
PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset | |
false, // suspend | |
!subExpression.empty(), // subscription | |
false); // class filter | |
try { | |
request->setLastPullTimestamp(UtilAll::currentTimeMillis()); | |
unique_ptr<PullResult> result( | |
m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1 | |
subExpression, // 2 | |
pSdata->getSubVersion(), // 3 | |
request->getNextOffset(), // 4 | |
32, // 5 | |
sysFlag, // 6 | |
commitOffsetValue, // 7 | |
1000 * 15, // 8 | |
1000 * 30, // 9 | |
ComMode_SYNC, // 10 | |
NULL, getSessionCredentials())); | |
PullResult pullResult = m_pPullAPIWrapper->processPullResult( | |
messageQueue, result.get(), pSdata); | |
switch (pullResult.pullStatus) { | |
case FOUND: { | |
if (!request->isDroped()) // if request is setted to dropped, don't add | |
// msgFoundList to m_msgTreeMap and don't | |
// call producePullMsgTask | |
{ // avoid issue: pullMsg is sent out, rebalance is doing concurrently | |
// and this request is dropped, and then received pulled msgs. | |
request->setNextOffset(pullResult.nextBeginOffset); | |
request->putMessage(pullResult.msgFoundList); | |
m_consumerService->submitConsumeRequest(request, | |
pullResult.msgFoundList); | |
producePullMsgTask(request); | |
LOG_DEBUG("FOUND:%s with size:" | |
SIZET_FMT | |
",nextBeginOffset:%lld", | |
messageQueue.toString().c_str(), | |
pullResult.msgFoundList.size(), pullResult.nextBeginOffset); | |
} else { | |
request->removePullMsgEvent(); | |
} | |
break; | |
} | |
case NO_NEW_MSG: { | |
request->setNextOffset(pullResult.nextBeginOffset); | |
vector<MQMessageExt> msgs; | |
request->getMessage(msgs); | |
if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) { | |
/*if broker losted/cleared msgs of one msgQueue, but the brokerOffset | |
is kept, then consumer will enter following situation: | |
1>. get pull offset with 0 when do rebalance, and set | |
m_offsetTable[mq] to 0; | |
2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin | |
offset increase by 800 | |
3>. request->getMessage(msgs) always NULL | |
4>. we need update consumerOffset to nextBeginOffset indicated by | |
broker | |
but if really no new msg could be pulled, also go to this CASE | |
*/ | |
// LOG_DEBUG("maybe misMatch between broker and client happens, update | |
// consumerOffset to nextBeginOffset indicated by broker"); | |
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset); | |
} | |
producePullMsgTask(request); | |
LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", | |
messageQueue.toString().c_str(), pullResult.nextBeginOffset); | |
break; | |
} | |
case NO_MATCHED_MSG: { | |
request->setNextOffset(pullResult.nextBeginOffset); | |
vector<MQMessageExt> msgs; | |
request->getMessage(msgs); | |
if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) { | |
// LOG_DEBUG("maybe misMatch between broker and client happens, update | |
// consumerOffset to nextBeginOffset indicated by broker"); | |
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset); | |
} | |
producePullMsgTask(request); | |
LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", | |
messageQueue.toString().c_str(), pullResult.nextBeginOffset); | |
break; | |
} | |
case OFFSET_ILLEGAL: { | |
request->setNextOffset(pullResult.nextBeginOffset); | |
producePullMsgTask(request); | |
LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", | |
messageQueue.toString().c_str(), pullResult.nextBeginOffset); | |
break; | |
} | |
case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker | |
// will not returns this status, so this case | |
// could not be entered. | |
LOG_ERROR("impossible BROKER_TIMEOUT Occurs"); | |
request->setNextOffset(pullResult.nextBeginOffset); | |
producePullMsgTask(request); | |
break; | |
} | |
} | |
} catch (MQException &e) { | |
LOG_ERROR(e.what()); | |
producePullMsgTask(request); | |
} | |
} | |
AsyncPullCallback *DefaultMQPushConsumer::getAsyncPullCallBack( | |
PullRequest *request, MQMessageQueue msgQueue) { | |
boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock); | |
if (m_asyncPull && request) { | |
PullMAP::iterator it = m_PullCallback.find(msgQueue); | |
if (it == m_PullCallback.end()) { | |
LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str()); | |
m_PullCallback[msgQueue] = new AsyncPullCallback(this, request); | |
} | |
return m_PullCallback[msgQueue]; | |
} | |
return NULL; | |
} | |
void DefaultMQPushConsumer::shutdownAsyncPullCallBack() { | |
boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock); | |
if (m_asyncPull) { | |
PullMAP::iterator it = m_PullCallback.begin(); | |
for (; it != m_PullCallback.end(); ++it) { | |
if (it->second) { | |
it->second->setShutdownStatus(); | |
} else { | |
LOG_ERROR("could not find asyncPullCallback for:%s", | |
it->first.toString().c_str()); | |
} | |
} | |
} | |
} | |
void DefaultMQPushConsumer::pullMessageAsync(PullRequest *request) { | |
if (request == NULL) { | |
LOG_ERROR("Pull request is NULL, return"); | |
return; | |
} | |
if (request->isDroped()) { | |
LOG_WARN("Pull request is set drop with mq:%s, return", | |
(request->m_messageQueue).toString().c_str()); | |
request->removePullMsgEvent(); | |
return; | |
} | |
MQMessageQueue &messageQueue = request->m_messageQueue; | |
if (m_consumerService->getConsumeMsgSerivceListenerType() == | |
messageListenerOrderly) { | |
if (!request->isLocked() || request->isLockExpired()) { | |
if (!m_pRebalance->lock(messageQueue)) { | |
producePullMsgTask(request); | |
return; | |
} | |
} | |
} | |
if (request->getCacheMsgCount() > m_maxMsgCacheSize) { | |
// LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger | |
// than:%d", (request->m_messageQueue).toString().c_str(), | |
// request->getCacheMsgCount(), m_maxMsgCacheSize); | |
boost::asio::deadline_timer *t = new boost::asio::deadline_timer( | |
m_async_ioService, boost::posix_time::milliseconds(1 * 1000)); | |
t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, | |
this, t, request)); | |
return; | |
} | |
bool commitOffsetEnable = false; | |
int64 commitOffsetValue = 0; | |
if (CLUSTERING == getMessageModel()) { | |
commitOffsetValue = m_pOffsetStore->readOffset( | |
messageQueue, READ_FROM_MEMORY, getSessionCredentials()); | |
if (commitOffsetValue > 0) { | |
commitOffsetEnable = true; | |
} | |
} | |
string subExpression; | |
SubscriptionData *pSdata = | |
(m_pRebalance->getSubscriptionData(messageQueue.getTopic())); | |
if (pSdata == NULL) { | |
producePullMsgTask(request); | |
return; | |
} | |
subExpression = pSdata->getSubString(); | |
int sysFlag = | |
PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset | |
true, // suspend | |
!subExpression.empty(), // subscription | |
false); // class filter | |
AsyncArg arg; | |
arg.mq = messageQueue; | |
arg.subData = *pSdata; | |
arg.pPullWrapper = m_pPullAPIWrapper; | |
try { | |
request->setLastPullTimestamp(UtilAll::currentTimeMillis()); | |
m_pPullAPIWrapper->pullKernelImpl( | |
messageQueue, // 1 | |
subExpression, // 2 | |
pSdata->getSubVersion(), // 3 | |
request->getNextOffset(), // 4 | |
32, // 5 | |
sysFlag, // 6 | |
commitOffsetValue, // 7 | |
1000 * 15, // 8 | |
m_asyncPullTimeout, // 9 | |
ComMode_ASYNC, // 10 | |
getAsyncPullCallBack(request, messageQueue), // 11 | |
getSessionCredentials(), // 12 | |
&arg); // 13 | |
} catch (MQException &e) { | |
LOG_ERROR(e.what()); | |
producePullMsgTask(request); | |
} | |
} | |
void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) { | |
if (asyncFlag) { | |
LOG_INFO("set pushConsumer:%s to async default pull mode", | |
getGroupName().c_str()); | |
} else { | |
LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str()); | |
} | |
m_asyncPull = asyncFlag; | |
} | |
void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) { | |
if (threadCount > 0) { | |
m_consumeThreadCount = threadCount; | |
} else { | |
LOG_ERROR("setConsumeThreadCount with invalid value"); | |
} | |
} | |
int DefaultMQPushConsumer::getConsumeThreadCount() const { | |
return m_consumeThreadCount; | |
} | |
void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) { | |
m_pullMsgThreadPoolNum = threadCount; | |
} | |
int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const { | |
return m_pullMsgThreadPoolNum; | |
} | |
int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const { | |
return m_consumeMessageBatchMaxSize; | |
} | |
void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize( | |
int consumeMessageBatchMaxSize) { | |
if (consumeMessageBatchMaxSize >= 1) | |
m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; | |
} | |
void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) { | |
if (maxCacheSize > 0 && maxCacheSize < 65535) { | |
LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize, | |
getGroupName().c_str()); | |
m_maxMsgCacheSize = maxCacheSize; | |
} | |
} | |
int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const { | |
return m_maxMsgCacheSize; | |
} | |
ConsumerRunningInfo *DefaultMQPushConsumer::getConsumerRunningInfo() { | |
ConsumerRunningInfo *info = new ConsumerRunningInfo(); | |
if (info) { | |
if (m_consumerService->getConsumeMsgSerivceListenerType() == | |
messageListenerOrderly) | |
info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true"); | |
else | |
info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase"); | |
info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, | |
UtilAll::to_string(m_consumeThreadCount)); | |
info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, | |
UtilAll::to_string(m_startTime)); | |
vector<SubscriptionData> result; | |
getSubscriptions(result); | |
info->setSubscriptionSet(result); | |
map<MQMessageQueue, PullRequest *> requestTable = | |
m_pRebalance->getPullRequestTable(); | |
map<MQMessageQueue, PullRequest *>::iterator it = requestTable.begin(); | |
for (; it != requestTable.end(); ++it) { | |
if (!it->second->isDroped()) { | |
map<MessageQueue, ProcessQueueInfo> queueTable; | |
MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(), | |
(it->first).getQueueId()); | |
ProcessQueueInfo processQueue; | |
processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset(); | |
processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset(); | |
processQueue.cachedMsgCount = it->second->getCacheMsgCount(); | |
processQueue.setCommitOffset(m_pOffsetStore->readOffset( | |
it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials())); | |
processQueue.setDroped(it->second->isDroped()); | |
processQueue.setLocked(it->second->isLocked()); | |
processQueue.lastLockTimestamp = it->second->getLastLockTimestamp(); | |
processQueue.lastPullTimestamp = it->second->getLastPullTimestamp(); | |
processQueue.lastConsumeTimestamp = | |
it->second->getLastConsumeTimestamp(); | |
info->setMqTable(queue, processQueue); | |
} | |
} | |
return info; | |
} | |
return NULL; | |
} | |
//<!************************************************************************ | |
} //<!end namespace; |