blob: 1f58e86a458842d5305dc9752ba9acab9c829016 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DefaultMQPullConsumer.h"
#include "AsyncArg.h"
#include "CommunicationMode.h"
#include "FilterAPI.h"
#include "Logging.h"
#include "MQClientFactory.h"
#include "MessageAccessor.h"
#include "NameSpaceUtil.h"
#include "OffsetStore.h"
#include "PullAPIWrapper.h"
#include "PullSysFlag.h"
#include "Rebalance.h"
#include "Validators.h"
namespace rocketmq {
//<!***************************************************************************
DefaultMQPullConsumer::DefaultMQPullConsumer(const string& groupname)
: m_pMessageQueueListener(NULL),
m_pOffsetStore(NULL),
m_pRebalance(NULL),
m_pPullAPIWrapper(NULL)
{
//<!set default group name;
string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
setGroupName(gname);
setMessageModel(CLUSTERING);
}
DefaultMQPullConsumer::~DefaultMQPullConsumer() {
m_pMessageQueueListener = NULL;
deleteAndZero(m_pRebalance);
deleteAndZero(m_pOffsetStore);
deleteAndZero(m_pPullAPIWrapper);
}
// MQConsumer
//<!************************************************************************
void DefaultMQPullConsumer::start() {
#ifndef WIN32
/* Ignore the SIGPIPE */
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
dealWithNameSpace();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
MQClient::start();
LOG_INFO("DefaultMQPullConsumer:%s start", m_GroupName.c_str());
//<!create rebalance;
m_pRebalance = new RebalancePull(this, getFactory());
string groupname = getGroupName();
m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
//<!data;
checkConfig();
copySubscription();
//<! registe;
bool registerOK = getFactory()->registerConsumer(this);
if (!registerOK) {
m_serviceState = CREATE_JUST;
THROW_MQEXCEPTION(
MQClientException,
"The cousumer group[" + getGroupName() + "] has been created before, specify another name please.", -1);
}
//<!msg model;
switch (getMessageModel()) {
case BROADCASTING:
m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
break;
case CLUSTERING:
m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
break;
}
bool bStartFailed = false;
string errorMsg;
try {
m_pOffsetStore->load();
} catch (MQClientException& e) {
bStartFailed = true;
errorMsg = std::string(e.what());
}
getFactory()->start();
m_serviceState = RUNNING;
if (bStartFailed) {
shutdown();
THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
}
break;
}
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
void DefaultMQPullConsumer::shutdown() {
switch (m_serviceState) {
case RUNNING: {
LOG_INFO("DefaultMQPullConsumer:%s shutdown", m_GroupName.c_str());
persistConsumerOffset();
getFactory()->unregisterConsumer(this);
getFactory()->shutdown();
m_serviceState = SHUTDOWN_ALREADY;
break;
}
case SHUTDOWN_ALREADY:
case CREATE_JUST:
break;
default:
break;
}
}
bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, string& brokerName) {
return true;
}
void DefaultMQPullConsumer::fetchSubscribeMessageQueues(const string& topic, vector<MQMessageQueue>& mqs) {
mqs.clear();
try {
const string localTopic = NameSpaceUtil::withNameSpace(topic, getNameSpace());
getFactory()->fetchSubscribeMessageQueues(localTopic, mqs, getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
void DefaultMQPullConsumer::updateTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& info) {}
void DefaultMQPullConsumer::registerMessageQueueListener(const string& topic, MQueueListener* pListener) {
m_registerTopics.insert(topic);
if (pListener) {
m_pMessageQueueListener = pListener;
}
}
PullResult DefaultMQPullConsumer::pull(const MQMessageQueue& mq,
const string& subExpression,
int64 offset,
int maxNums) {
return pullSyncImpl(mq, subExpression, offset, maxNums, false);
}
void DefaultMQPullConsumer::pull(const MQMessageQueue& mq,
const string& subExpression,
int64 offset,
int maxNums,
PullCallback* pPullCallback) {
pullAsyncImpl(mq, subExpression, offset, maxNums, false, pPullCallback);
}
PullResult DefaultMQPullConsumer::pullBlockIfNotFound(const MQMessageQueue& mq,
const string& subExpression,
int64 offset,
int maxNums) {
return pullSyncImpl(mq, subExpression, offset, maxNums, true);
}
void DefaultMQPullConsumer::pullBlockIfNotFound(const MQMessageQueue& mq,
const string& subExpression,
int64 offset,
int maxNums,
PullCallback* pPullCallback) {
pullAsyncImpl(mq, subExpression, offset, maxNums, true, pPullCallback);
}
PullResult DefaultMQPullConsumer::pullSyncImpl(const MQMessageQueue& mq,
const string& subExpression,
int64 offset,
int maxNums,
bool block) {
if (offset < 0)
THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
if (maxNums <= 0)
THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
//<!auto subscript,all sub;
subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag::buildSysFlag(false, block, true, false);
//<!this sub;
unique_ptr<SubscriptionData> pSData(FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression));
int timeoutMillis = block ? 1000 * 30 : 1000 * 10;
try {
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq, // 1
pSData->getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
1000 * 20, // 8
timeoutMillis, // 9
ComMode_SYNC, // 10
NULL, //<!callback;
getSessionCredentials(), NULL));
PullResult pr = m_pPullAPIWrapper->processPullResult(mq, pullResult.get(), pSData.get());
if (m_useNameSpaceMode) {
MessageAccessor::withoutNameSpace(pr.msgFoundList, m_nameSpace);
}
return pr;
} catch (MQException& e) {
LOG_ERROR(e.what());
}
return PullResult(BROKER_TIMEOUT);
}
void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
const string& subExpression,
int64 offset,
int maxNums,
bool block,
PullCallback* pPullCallback) {
if (offset < 0)
THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
if (maxNums <= 0)
THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
if (!pPullCallback)
THROW_MQEXCEPTION(MQClientException, "pPullCallback is null", -1);
//<!auto subscript,all sub;
subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag::buildSysFlag(false, block, true, false);
//<!this sub;
unique_ptr<SubscriptionData> pSData(FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression));
int timeoutMillis = block ? 1000 * 30 : 1000 * 10;
//<!�첽����;
AsyncArg arg;
arg.mq = mq;
arg.subData = *pSData;
arg.pPullWrapper = m_pPullAPIWrapper;
try {
// not support name space
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq, // 1
pSData->getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
1000 * 20, // 8
timeoutMillis, // 9
ComMode_ASYNC, // 10
pPullCallback, getSessionCredentials(), &arg));
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
void DefaultMQPullConsumer::subscriptionAutomatically(const string& topic) {
SubscriptionData* pSdata = m_pRebalance->getSubscriptionData(topic);
if (pSdata == NULL) {
unique_ptr<SubscriptionData> subscriptionData(FilterAPI::buildSubscriptionData(topic, SUB_ALL));
m_pRebalance->setSubscriptionData(topic, subscriptionData.release());
}
}
void DefaultMQPullConsumer::updateConsumeOffset(const MQMessageQueue& mq, int64 offset) {
m_pOffsetStore->updateOffset(mq, offset);
}
void DefaultMQPullConsumer::removeConsumeOffset(const MQMessageQueue& mq) {
m_pOffsetStore->removeOffset(mq);
}
int64 DefaultMQPullConsumer::fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore) {
return m_pOffsetStore->readOffset(mq, fromStore ? READ_FROM_STORE : MEMORY_FIRST_THEN_STORE, getSessionCredentials());
}
void DefaultMQPullConsumer::persistConsumerOffset() {
/*As do not execute rebalance for pullConsumer now, requestTable is always
empty
map<MQMessageQueue, PullRequest*> requestTable =
m_pRebalance->getPullRequestTable();
map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
vector<MQMessageQueue> mqs;
for (; it != requestTable.end(); ++it)
{
if (it->second)
{
mqs.push_back(it->first);
}
}
m_pOffsetStore->persistAll(mqs);*/
}
void DefaultMQPullConsumer::persistConsumerOffsetByResetOffset() {}
void DefaultMQPullConsumer::persistConsumerOffset4PullConsumer(const MQMessageQueue& mq) {
if (isServiceStateOk()) {
m_pOffsetStore->persist(mq, getSessionCredentials());
}
}
void DefaultMQPullConsumer::fetchMessageQueuesInBalance(const string& topic, vector<MQMessageQueue> mqs) {}
void DefaultMQPullConsumer::checkConfig() {
string groupname = getGroupName();
// check consumerGroup
Validators::checkGroup(groupname);
// consumerGroup
if (!groupname.compare(DEFAULT_CONSUMER_GROUP)) {
THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal DEFAULT_CONSUMER", -1);
}
if (getMessageModel() != BROADCASTING && getMessageModel() != CLUSTERING) {
THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
}
}
void DefaultMQPullConsumer::doRebalance() {}
void DefaultMQPullConsumer::copySubscription() {
set<string>::iterator it = m_registerTopics.begin();
for (; it != m_registerTopics.end(); ++it) {
unique_ptr<SubscriptionData> subscriptionData(FilterAPI::buildSubscriptionData((*it), SUB_ALL));
m_pRebalance->setSubscriptionData((*it), subscriptionData.release());
}
}
ConsumeType DefaultMQPullConsumer::getConsumeType() {
return CONSUME_ACTIVELY;
}
ConsumeFromWhere DefaultMQPullConsumer::getConsumeFromWhere() {
return CONSUME_FROM_LAST_OFFSET;
}
void DefaultMQPullConsumer::getSubscriptions(vector<SubscriptionData>& result) {
set<string>::iterator it = m_registerTopics.begin();
for (; it != m_registerTopics.end(); ++it) {
SubscriptionData ms(*it, SUB_ALL);
result.push_back(ms);
}
}
bool DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest) {
return true;
}
Rebalance* DefaultMQPullConsumer::getRebalance() const {
return NULL;
}
// we should deal with name space before producer start.
bool DefaultMQPullConsumer::dealWithNameSpace() {
string ns = getNameSpace();
if (ns.empty()) {
string nsAddr = getNamesrvAddr();
if (!NameSpaceUtil::checkNameSpaceExistInNameServer(nsAddr)) {
return true;
}
ns = NameSpaceUtil::getNameSpaceFromNsURL(nsAddr);
// reset namespace
setNameSpace(ns);
}
// reset group name
if (!NameSpaceUtil::hasNameSpace(getGroupName(), ns)) {
string fullGID = NameSpaceUtil::withNameSpace(getGroupName(), ns);
setGroupName(fullGID);
}
set<string> tmpTopics;
for (auto iter = m_registerTopics.begin(); iter != m_registerTopics.end(); iter++) {
string topic = *iter;
if (!NameSpaceUtil::hasNameSpace(topic, ns)) {
LOG_INFO("Update Subscribe Topic[%s] with NameSpace:%s", topic.c_str(), ns.c_str());
topic = NameSpaceUtil::withNameSpace(topic, ns);
// let other mode to known, the name space model opened.
m_useNameSpaceMode = true;
}
tmpTopics.insert(topic);
}
m_registerTopics.swap(tmpTopics);
return true;
}
//<!************************************************************************
} // namespace rocketmq