blob: f9e58749384f5757d853d0252b9365f5a0f5cfa2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DefaultMQPushConsumer.h"
#include "CommunicationMode.h"
#include "ConsumeMsgService.h"
#include "ConsumerRunningInfo.h"
#include "FilterAPI.h"
#include "Logging.h"
#include "MQClientAPIImpl.h"
#include "MQClientFactory.h"
#include "MQClientManager.h"
#include "MQProtos.h"
#include "OffsetStore.h"
#include "PullAPIWrapper.h"
#include "PullSysFlag.h"
#include "Rebalance.h"
#include "UtilAll.h"
#include "Validators.h"
#include "task_queue.h"
namespace rocketmq {
class AsyncPullCallback : public PullCallback {
public:
AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, PullRequest* request)
: m_callbackOwner(pushConsumer), m_pullRequest(request), m_bShutdown(false) {}
virtual ~AsyncPullCallback() {
m_callbackOwner = NULL;
m_pullRequest = NULL;
}
virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool bProducePullRequest) {
if (m_bShutdown == true) {
LOG_INFO("pullrequest for:%s in shutdown, return", (m_pullRequest->m_messageQueue).toString().c_str());
m_pullRequest->removePullMsgEvent();
return;
}
switch (result.pullStatus) {
case FOUND: {
if (!m_pullRequest->isDroped()) // if request is setted to dropped,
// don't add msgFoundList to
// m_msgTreeMap and don't call
// producePullMsgTask
{ // avoid issue: pullMsg is sent out, rebalance is doing concurrently
// and this request is dropped, and then received pulled msgs.
m_pullRequest->setNextOffset(result.nextBeginOffset);
m_pullRequest->putMessage(result.msgFoundList);
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(m_pullRequest, result.msgFoundList);
if (bProducePullRequest)
m_callbackOwner->producePullMsgTask(m_pullRequest);
else
m_pullRequest->removePullMsgEvent();
LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
(m_pullRequest->m_messageQueue).toString().c_str(), result.msgFoundList.size(),
result.nextBeginOffset);
} else {
LOG_INFO("remove pullmsg event of mq:%s", (m_pullRequest->m_messageQueue).toString().c_str());
m_pullRequest->removePullMsgEvent();
}
break;
}
case NO_NEW_MSG: {
m_pullRequest->setNextOffset(result.nextBeginOffset);
vector<MQMessageExt> msgs;
m_pullRequest->getMessage(msgs);
if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
/*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
is kept, then consumer will enter following situation:
1>. get pull offset with 0 when do rebalance, and set
m_offsetTable[mq] to 0;
2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
offset increase by 800
3>. request->getMessage(msgs) always NULL
4>. we need update consumerOffset to nextBeginOffset indicated by
broker
but if really no new msg could be pulled, also go to this CASE
LOG_INFO("maybe misMatch between broker and client happens, update
consumerOffset to nextBeginOffset indicated by broker");*/
m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest)
m_callbackOwner->producePullMsgTask(m_pullRequest);
else
m_pullRequest->removePullMsgEvent();
/*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
(m_pullRequest->m_messageQueue).toString().c_str(),
result.nextBeginOffset);*/
break;
}
case NO_MATCHED_MSG: {
m_pullRequest->setNextOffset(result.nextBeginOffset);
vector<MQMessageExt> msgs;
m_pullRequest->getMessage(msgs);
if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
/*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
is kept, then consumer will enter following situation:
1>. get pull offset with 0 when do rebalance, and set
m_offsetTable[mq] to 0;
2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
offset increase by 800
3>. request->getMessage(msgs) always NULL
4>. we need update consumerOffset to nextBeginOffset indicated by
broker
but if really no new msg could be pulled, also go to this CASE
LOG_INFO("maybe misMatch between broker and client happens, update
consumerOffset to nextBeginOffset indicated by broker");*/
m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest)
m_callbackOwner->producePullMsgTask(m_pullRequest);
else
m_pullRequest->removePullMsgEvent();
/*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
(m_pullRequest->m_messageQueue).toString().c_str(),
result.nextBeginOffset);*/
break;
}
case OFFSET_ILLEGAL: {
m_pullRequest->setNextOffset(result.nextBeginOffset);
if (bProducePullRequest)
m_callbackOwner->producePullMsgTask(m_pullRequest);
else
m_pullRequest->removePullMsgEvent();
/*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
(m_pullRequest->m_messageQueue).toString().c_str(),
result.nextBeginOffset);*/
break;
}
case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
// will not returns this status, so this case
// could not be entered.
LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
m_pullRequest->setNextOffset(result.nextBeginOffset);
if (bProducePullRequest)
m_callbackOwner->producePullMsgTask(m_pullRequest);
else
m_pullRequest->removePullMsgEvent();
break;
}
}
}
virtual void onException(MQException& e) {
if (m_bShutdown == true) {
LOG_INFO("pullrequest for:%s in shutdown, return", (m_pullRequest->m_messageQueue).toString().c_str());
m_pullRequest->removePullMsgEvent();
return;
}
LOG_WARN("pullrequest for:%s occurs exception, reproduce it", (m_pullRequest->m_messageQueue).toString().c_str());
m_callbackOwner->producePullMsgTask(m_pullRequest);
}
void setShutdownStatus() { m_bShutdown = true; }
private:
DefaultMQPushConsumer* m_callbackOwner;
PullRequest* m_pullRequest;
bool m_bShutdown;
};
//<!***************************************************************************
static boost::mutex m_asyncCallbackLock;
DefaultMQPushConsumer::DefaultMQPushConsumer(const string& groupname)
: m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
m_pOffsetStore(NULL),
m_pRebalance(NULL),
m_pPullAPIWrapper(NULL),
m_consumerService(NULL),
m_pMessageListener(NULL),
m_consumeMessageBatchMaxSize(1),
m_maxMsgCacheSize(1000),
m_pullmsgQueue(NULL) {
//<!set default group name;
string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
setGroupName(gname);
m_asyncPull = true;
m_asyncPullTimeout = 30 * 1000;
setMessageModel(CLUSTERING);
m_startTime = UtilAll::currentTimeMillis();
m_consumeThreadCount = boost::thread::hardware_concurrency();
m_pullMsgThreadPoolNum = boost::thread::hardware_concurrency();
m_async_service_thread.reset(new boost::thread(boost::bind(&DefaultMQPushConsumer::boost_asio_work, this)));
}
void DefaultMQPushConsumer::boost_asio_work() {
LOG_INFO("DefaultMQPushConsumer::boost asio async service runing");
boost::asio::io_service::work work(m_async_ioService); // avoid async io
// service stops after
// first timer timeout
// callback
m_async_ioService.run();
}
DefaultMQPushConsumer::~DefaultMQPushConsumer() {
m_pMessageListener = NULL;
if (m_pullmsgQueue != NULL) {
deleteAndZero(m_pullmsgQueue);
}
if (m_pRebalance != NULL) {
deleteAndZero(m_pRebalance);
}
if (m_pOffsetStore != NULL) {
deleteAndZero(m_pOffsetStore);
}
if (m_pPullAPIWrapper != NULL) {
deleteAndZero(m_pPullAPIWrapper);
}
if (m_consumerService != NULL) {
deleteAndZero(m_consumerService);
}
PullMAP::iterator it = m_PullCallback.begin();
for (; it != m_PullCallback.end(); ++it) {
deleteAndZero(it->second);
}
m_PullCallback.clear();
m_subTopics.clear();
}
void DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
try {
getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg, getGroupName(), delayLevel, 3000,
getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
void DefaultMQPushConsumer::fetchSubscribeMessageQueues(const string& topic, vector<MQMessageQueue>& mqs) {
mqs.clear();
try {
getFactory()->fetchSubscribeMessageQueues(topic, mqs, getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
void DefaultMQPushConsumer::doRebalance() {
if (isServiceStateOk()) {
try {
m_pRebalance->doRebalance();
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
}
void DefaultMQPushConsumer::persistConsumerOffset() {
if (isServiceStateOk()) {
m_pRebalance->persistConsumerOffset();
}
}
void DefaultMQPushConsumer::persistConsumerOffsetByResetOffset() {
if (isServiceStateOk()) {
m_pRebalance->persistConsumerOffsetByResetOffset();
}
}
void DefaultMQPushConsumer::start() {
#ifndef WIN32
/* Ignore the SIGPIPE */
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
MQClient::start();
LOG_INFO("DefaultMQPushConsumer:%s start", m_GroupName.c_str());
//<!data;
checkConfig();
//<!create rebalance;
m_pRebalance = new RebalancePush(this, getFactory());
string groupname = getGroupName();
m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
if (m_pMessageListener) {
if (m_pMessageListener->getMessageListenerType() == messageListenerOrderly) {
LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
m_consumerService = new ConsumeMessageOrderlyService(this, m_consumeThreadCount, m_pMessageListener);
} else // for backward compatible, defaultly and concurrently listeners
// are allocating ConsumeMessageConcurrentlyService
{
LOG_INFO("start concurrently consume service:%s", getGroupName().c_str());
m_consumerService = new ConsumeMessageConcurrentlyService(this, m_consumeThreadCount, m_pMessageListener);
}
}
m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
m_pullmsgThread.reset(
new boost::thread(boost::bind(&DefaultMQPushConsumer::runPullMsgQueue, this, m_pullmsgQueue)));
copySubscription();
//<! registe;
bool registerOK = getFactory()->registerConsumer(this);
if (!registerOK) {
m_serviceState = CREATE_JUST;
THROW_MQEXCEPTION(
MQClientException,
"The cousumer group[" + getGroupName() + "] has been created before, specify another name please.", -1);
}
//<!msg model;
switch (getMessageModel()) {
case BROADCASTING:
m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
break;
case CLUSTERING:
m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
break;
}
bool bStartFailed = false;
string errorMsg;
try {
m_pOffsetStore->load();
} catch (MQClientException& e) {
bStartFailed = true;
errorMsg = std::string(e.what());
}
m_consumerService->start();
getFactory()->start();
updateTopicSubscribeInfoWhenSubscriptionChanged();
getFactory()->sendHeartbeatToAllBroker();
m_serviceState = RUNNING;
if (bStartFailed) {
shutdown();
THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
}
break;
}
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
break;
default:
break;
}
getFactory()->rebalanceImmediately();
}
void DefaultMQPushConsumer::shutdown() {
switch (m_serviceState) {
case RUNNING: {
LOG_INFO("DefaultMQPushConsumer shutdown");
m_async_ioService.stop();
m_async_service_thread->interrupt();
m_async_service_thread->join();
m_pullmsgQueue->close();
m_pullmsgThread->interrupt();
m_pullmsgThread->join();
m_consumerService->shutdown();
persistConsumerOffset();
shutdownAsyncPullCallBack(); // delete aync pullMsg resources
getFactory()->unregisterConsumer(this);
getFactory()->shutdown();
m_serviceState = SHUTDOWN_ALREADY;
break;
}
case CREATE_JUST:
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
void DefaultMQPushConsumer::registerMessageListener(MQMessageListener* pMessageListener) {
if (NULL != pMessageListener) {
m_pMessageListener = pMessageListener;
}
}
MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
if (NULL != m_pMessageListener) {
return m_pMessageListener->getMessageListenerType();
}
return messageListenerDefaultly;
}
ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const {
return m_consumerService;
}
OffsetStore* DefaultMQPushConsumer::getOffsetStore() const {
return m_pOffsetStore;
}
Rebalance* DefaultMQPushConsumer::getRebalance() const {
return m_pRebalance;
}
void DefaultMQPushConsumer::subscribe(const string& topic, const string& subExpression) {
m_subTopics[topic] = subExpression;
}
void DefaultMQPushConsumer::checkConfig() {
string groupname = getGroupName();
// check consumerGroup
Validators::checkGroup(groupname);
// consumerGroup
if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal DEFAULT_CONSUMER", -1);
}
if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
}
if (m_pMessageListener == NULL) {
THROW_MQEXCEPTION(MQClientException, "messageListener is null ", -1);
}
}
void DefaultMQPushConsumer::copySubscription() {
map<string, string>::iterator it = m_subTopics.begin();
for (; it != m_subTopics.end(); ++it) {
LOG_INFO("buildSubscriptionData,:%s,%s", it->first.c_str(), it->second.c_str());
unique_ptr<SubscriptionData> pSData(FilterAPI::buildSubscriptionData(it->first, it->second));
m_pRebalance->setSubscriptionData(it->first, pSData.release());
}
switch (getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING: {
string retryTopic = UtilAll::getRetryTopic(getGroupName());
//<!this sub;
unique_ptr<SubscriptionData> pSData(FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
m_pRebalance->setSubscriptionData(retryTopic, pSData.release());
break;
}
default:
break;
}
}
void DefaultMQPushConsumer::updateTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& info) {
m_pRebalance->setTopicSubscribeInfo(topic, info);
}
void DefaultMQPushConsumer::updateTopicSubscribeInfoWhenSubscriptionChanged() {
map<string, SubscriptionData*>& subTable = m_pRebalance->getSubscriptionInner();
map<string, SubscriptionData*>::iterator it = subTable.begin();
for (; it != subTable.end(); ++it) {
bool btopic = getFactory()->updateTopicRouteInfoFromNameServer(it->first, getSessionCredentials());
if (btopic == false) {
LOG_WARN("The topic:[%s] not exist", it->first.c_str());
}
}
}
ConsumeType DefaultMQPushConsumer::getConsumeType() {
return CONSUME_PASSIVELY;
}
ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() {
return m_consumeFromWhere;
}
void DefaultMQPushConsumer::setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
m_consumeFromWhere = consumeFromWhere;
}
void DefaultMQPushConsumer::getSubscriptions(vector<SubscriptionData>& result) {
map<string, SubscriptionData*>& subTable = m_pRebalance->getSubscriptionInner();
map<string, SubscriptionData*>::iterator it = subTable.begin();
for (; it != subTable.end(); ++it) {
result.push_back(*(it->second));
}
}
void DefaultMQPushConsumer::updateConsumeOffset(const MQMessageQueue& mq, int64 offset) {
if (offset >= 0) {
m_pOffsetStore->updateOffset(mq, offset);
} else {
LOG_ERROR("updateConsumeOffset of mq:%s error", mq.toString().c_str());
}
}
void DefaultMQPushConsumer::removeConsumeOffset(const MQMessageQueue& mq) {
m_pOffsetStore->removeOffset(mq);
}
void DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t, PullRequest* request) {
// LOG_INFO("trigger pullrequest for:%s",
// (request->m_messageQueue).toString().c_str());
producePullMsgTask(request);
deleteAndZero(t);
}
void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) {
if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
request->addPullMsgEvent();
if (m_asyncPull) {
m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessageAsync, this, request));
} else {
m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, this, request));
}
} else {
LOG_WARN("produce pullmsg of mq:%s failed", request->m_messageQueue.toString().c_str());
}
}
void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) {
pTaskQueue->run();
}
void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
if (request == NULL) {
LOG_ERROR("Pull request is NULL, return");
return;
}
if (request->isDroped()) {
LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str());
request->removePullMsgEvent();
return;
}
MQMessageQueue& messageQueue = request->m_messageQueue;
if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
if (!request->isLocked() || request->isLockExpired()) {
if (!m_pRebalance->lock(messageQueue)) {
producePullMsgTask(request);
return;
}
}
}
if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
// LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
// than:%d", (request->m_messageQueue).toString().c_str(),
// request->getCacheMsgCount(), m_maxMsgCacheSize);
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, this, t, request));
return;
}
bool commitOffsetEnable = false;
int64 commitOffsetValue = 0;
if (CLUSTERING == getMessageModel()) {
commitOffsetValue = m_pOffsetStore->readOffset(messageQueue, READ_FROM_MEMORY, getSessionCredentials());
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
string subExpression;
SubscriptionData* pSdata = m_pRebalance->getSubscriptionData(messageQueue.getTopic());
if (pSdata == NULL) {
producePullMsgTask(request);
return;
}
subExpression = pSdata->getSubString();
int sysFlag = PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
false, // suspend
!subExpression.empty(), // subscription
false); // class filter
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
unique_ptr<PullResult> result(m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
subExpression, // 2
pSdata->getSubVersion(), // 3
request->getNextOffset(), // 4
32, // 5
sysFlag, // 6
commitOffsetValue, // 7
1000 * 15, // 8
1000 * 30, // 9
ComMode_SYNC, // 10
NULL, getSessionCredentials()));
PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, result.get(), pSdata);
switch (pullResult.pullStatus) {
case FOUND: {
if (!request->isDroped()) // if request is setted to dropped, don't add
// msgFoundList to m_msgTreeMap and don't
// call producePullMsgTask
{ // avoid issue: pullMsg is sent out, rebalance is doing concurrently
// and this request is dropped, and then received pulled msgs.
request->setNextOffset(pullResult.nextBeginOffset);
request->putMessage(pullResult.msgFoundList);
m_consumerService->submitConsumeRequest(request, pullResult.msgFoundList);
producePullMsgTask(request);
LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", messageQueue.toString().c_str(),
pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
} else {
request->removePullMsgEvent();
}
break;
}
case NO_NEW_MSG: {
request->setNextOffset(pullResult.nextBeginOffset);
vector<MQMessageExt> msgs;
request->getMessage(msgs);
if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
/*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
is kept, then consumer will enter following situation:
1>. get pull offset with 0 when do rebalance, and set
m_offsetTable[mq] to 0;
2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
offset increase by 800
3>. request->getMessage(msgs) always NULL
4>. we need update consumerOffset to nextBeginOffset indicated by
broker
but if really no new msg could be pulled, also go to this CASE
*/
// LOG_DEBUG("maybe misMatch between broker and client happens, update
// consumerOffset to nextBeginOffset indicated by broker");
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(), pullResult.nextBeginOffset);
break;
}
case NO_MATCHED_MSG: {
request->setNextOffset(pullResult.nextBeginOffset);
vector<MQMessageExt> msgs;
request->getMessage(msgs);
if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
// LOG_DEBUG("maybe misMatch between broker and client happens, update
// consumerOffset to nextBeginOffset indicated by broker");
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(),
pullResult.nextBeginOffset);
break;
}
case OFFSET_ILLEGAL: {
request->setNextOffset(pullResult.nextBeginOffset);
producePullMsgTask(request);
LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(),
pullResult.nextBeginOffset);
break;
}
case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
// will not returns this status, so this case
// could not be entered.
LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
request->setNextOffset(pullResult.nextBeginOffset);
producePullMsgTask(request);
break;
}
}
} catch (MQException& e) {
LOG_ERROR(e.what());
producePullMsgTask(request);
}
}
AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(PullRequest* request, MQMessageQueue msgQueue) {
boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
if (m_asyncPull && request) {
PullMAP::iterator it = m_PullCallback.find(msgQueue);
if (it == m_PullCallback.end()) {
LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
}
return m_PullCallback[msgQueue];
}
return NULL;
}
void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
if (m_asyncPull) {
PullMAP::iterator it = m_PullCallback.begin();
for (; it != m_PullCallback.end(); ++it) {
if (it->second) {
it->second->setShutdownStatus();
} else {
LOG_ERROR("could not find asyncPullCallback for:%s", it->first.toString().c_str());
}
}
}
}
void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
if (request == NULL) {
LOG_ERROR("Pull request is NULL, return");
return;
}
if (request->isDroped()) {
LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str());
request->removePullMsgEvent();
return;
}
MQMessageQueue& messageQueue = request->m_messageQueue;
if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
if (!request->isLocked() || request->isLockExpired()) {
if (!m_pRebalance->lock(messageQueue)) {
producePullMsgTask(request);
return;
}
}
}
if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
// LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
// than:%d", (request->m_messageQueue).toString().c_str(),
// request->getCacheMsgCount(), m_maxMsgCacheSize);
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(1 * 1000));
t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, this, t, request));
return;
}
bool commitOffsetEnable = false;
int64 commitOffsetValue = 0;
if (CLUSTERING == getMessageModel()) {
commitOffsetValue = m_pOffsetStore->readOffset(messageQueue, READ_FROM_MEMORY, getSessionCredentials());
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
string subExpression;
SubscriptionData* pSdata = (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
if (pSdata == NULL) {
producePullMsgTask(request);
return;
}
subExpression = pSdata->getSubString();
int sysFlag = PullSysFlag::buildSysFlag(commitOffsetEnable, // commitOffset
true, // suspend
!subExpression.empty(), // subscription
false); // class filter
AsyncArg arg;
arg.mq = messageQueue;
arg.subData = *pSdata;
arg.pPullWrapper = m_pPullAPIWrapper;
arg.pPullRequest = request;
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
subExpression, // 2
pSdata->getSubVersion(), // 3
request->getNextOffset(), // 4
32, // 5
sysFlag, // 6
commitOffsetValue, // 7
1000 * 15, // 8
m_asyncPullTimeout, // 9
ComMode_ASYNC, // 10
getAsyncPullCallBack(request, messageQueue), // 11
getSessionCredentials(), // 12
&arg); // 13
} catch (MQException& e) {
LOG_ERROR(e.what());
producePullMsgTask(request);
}
}
void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
if (asyncFlag) {
LOG_INFO("set pushConsumer:%s to async default pull mode", getGroupName().c_str());
} else {
LOG_INFO("set pushConsumer:%s to sync pull mode", getGroupName().c_str());
}
m_asyncPull = asyncFlag;
}
void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) {
if (threadCount > 0) {
m_consumeThreadCount = threadCount;
} else {
LOG_ERROR("setConsumeThreadCount with invalid value");
}
}
int DefaultMQPushConsumer::getConsumeThreadCount() const {
return m_consumeThreadCount;
}
void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) {
m_pullMsgThreadPoolNum = threadCount;
}
int DefaultMQPushConsumer::getPullMsgThreadPoolCount() const {
return m_pullMsgThreadPoolNum;
}
int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() const {
return m_consumeMessageBatchMaxSize;
}
void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
if (consumeMessageBatchMaxSize >= 1)
m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}
void DefaultMQPushConsumer::setMaxCacheMsgSizePerQueue(int maxCacheSize) {
if (maxCacheSize > 0 && maxCacheSize < 65535) {
LOG_INFO("set maxCacheSize to:%d for consumer:%s", maxCacheSize, getGroupName().c_str());
m_maxMsgCacheSize = maxCacheSize;
}
}
int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
return m_maxMsgCacheSize;
}
ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {
ConsumerRunningInfo* info = new ConsumerRunningInfo();
if (info) {
if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly)
info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
else
info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(m_startTime));
vector<SubscriptionData> result;
getSubscriptions(result);
info->setSubscriptionSet(result);
map<MQMessageQueue, PullRequest*> requestTable = m_pRebalance->getPullRequestTable();
map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
for (; it != requestTable.end(); ++it) {
if (!it->second->isDroped()) {
map<MessageQueue, ProcessQueueInfo> queueTable;
MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(), (it->first).getQueueId());
ProcessQueueInfo processQueue;
processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
processQueue.cachedMsgCount = it->second->getCacheMsgCount();
processQueue.setCommitOffset(
m_pOffsetStore->readOffset(it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
processQueue.setDroped(it->second->isDroped());
processQueue.setLocked(it->second->isLocked());
processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
processQueue.lastConsumeTimestamp = it->second->getLastConsumeTimestamp();
info->setMqTable(queue, processQueue);
}
}
return info;
}
return NULL;
}
//<!************************************************************************
} //<!end namespace;