blob: f9303625cfb6b4e77aacb9a756dd4c3ffc981429 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Rebalance.h"
#include "DefaultMQPushConsumerImpl.h"
#include "LockBatchBody.h"
#include "Logging.h"
#include "MQClientAPIImpl.h"
#include "MQClientFactory.h"
#include "OffsetStore.h"
namespace rocketmq {
//<!************************************************************************
Rebalance::Rebalance(MQConsumer* consumer, MQClientFactory* pfactory)
: m_pConsumer(consumer), m_pClientFactory(pfactory) {
m_pAllocateMQStrategy = new AllocateMQAveragely();
}
Rebalance::~Rebalance() {
{
map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
for (; it != m_subscriptionData.end(); ++it) {
deleteAndZero(it->second);
}
m_subscriptionData.clear();
}
{
/*
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
delete it->second;
it->second = NULL;
}
m_requestQueueTable.clear();*/
}
m_topicSubscribeInfoTable.clear();
m_pConsumer = NULL;
m_pClientFactory = NULL;
deleteAndZero(m_pAllocateMQStrategy);
}
void Rebalance::doRebalance() {
LOG_DEBUG("start doRebalance");
try {
map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
for (; it != m_subscriptionData.end(); ++it) {
string topic = (it->first);
LOG_DEBUG("current topic is:%s", topic.c_str());
//<!topic -> mqs
vector<MQMessageQueue> mqAll;
if (!getTopicSubscribeInfo(topic, mqAll)) {
continue;
}
if (mqAll.empty()) {
if (!UtilAll::startsWith_retry(topic)) {
std::string msg("#doRebalance. mqAll for topic:");
msg.append(topic);
msg.append(" is empty");
LOG_ERROR("Queues to allocate are empty. Msg: %s", msg.c_str());
// to check, return error or throw exception
THROW_MQEXCEPTION(MQClientException, msg, -1);
}
}
//<!msg model;
switch (m_pConsumer->getMessageModel()) {
case BROADCASTING: {
bool changed = updateRequestTableInRebalance(topic, mqAll);
if (changed) {
messageQueueChanged(topic, mqAll, mqAll);
}
break;
}
case CLUSTERING: {
vector<string> cidAll;
m_pClientFactory->findConsumerIds(topic, m_pConsumer->getGroupName(), cidAll,
m_pConsumer->getSessionCredentials());
if (cidAll.empty()) {
LOG_ERROR("[ERROR] Get empty consumer IDs. Consumer Group: %s, Topic: %s",
m_pConsumer->getGroupName().c_str(), topic.c_str());
// Should skip this round of re-balance immediately if consumer ID set is empty.
THROW_MQEXCEPTION(MQClientException, "doRebalance the cidAll is empty", -1);
}
// log
for (int i = 0; i < (int)cidAll.size(); ++i) {
LOG_DEBUG("client id:%s of topic:%s", cidAll[i].c_str(), topic.c_str());
}
//<! sort;
sort(mqAll.begin(), mqAll.end());
sort(cidAll.begin(), cidAll.end());
//<! allocate;
vector<MQMessageQueue> allocateResult;
try {
m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(), mqAll, cidAll, allocateResult);
} catch (MQException& e) {
std::string errMsg("Allocate message queue for ConsumerGroup[");
errMsg.append(m_pConsumer->getGroupName());
errMsg.append("],Topic[");
errMsg.append(topic);
errMsg.append("] failed. ");
LOG_ERROR(errMsg.c_str());
THROW_MQEXCEPTION(MQClientException, errMsg, -1);
}
// log
for (int i = 0; i < (int)allocateResult.size(); ++i) {
LOG_DEBUG("allocate mq:%s", allocateResult[i].toString().c_str());
}
//<!update local;
bool changed = updateRequestTableInRebalance(topic, allocateResult);
if (changed) {
std::stringstream ss;
ss << "Allocation result for [Consumer Group: " << m_pConsumer->getGroupName() << ", Topic: " << topic
<< ", Current Consumer ID: " << m_pConsumer->getMQClientId() << "] is changed.\n "
<< "Total Queue :#" << mqAll.size() << ", Total Consumer :#" << cidAll.size()
<< " Allocated Queues are: \n";
for (vector<MQMessageQueue>::size_type i = 0; i < allocateResult.size(); ++i) {
ss << allocateResult[i].toString() << "\n";
}
// Log allocation result.
LOG_INFO(ss.str().c_str());
messageQueueChanged(topic, mqAll, allocateResult);
break;
}
}
default:
break;
}
}
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
void Rebalance::persistConsumerOffset() {
DefaultMQPushConsumerImpl* pConsumer = static_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
vector<MQMessageQueue> mqs;
{
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
if (it->second && (!it->second->isDropped())) {
mqs.push_back(it->first);
}
}
}
if (pConsumer->getMessageModel() == BROADCASTING) {
pOffsetStore->persistAll(mqs);
} else {
vector<MQMessageQueue>::iterator it2 = mqs.begin();
for (; it2 != mqs.end(); ++it2) {
pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
}
}
}
void Rebalance::persistConsumerOffsetByResetOffset() {
DefaultMQPushConsumerImpl* pConsumer = static_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
vector<MQMessageQueue> mqs;
{
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
if (it->second) { // even if it was dropped, also need update offset when
// rcv resetOffset cmd
mqs.push_back(it->first);
}
}
}
vector<MQMessageQueue>::iterator it2 = mqs.begin();
for (; it2 != mqs.end(); ++it2) {
pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
}
}
SubscriptionData* Rebalance::getSubscriptionData(const string& topic) {
if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
return m_subscriptionData[topic];
}
return NULL;
}
map<string, SubscriptionData*>& Rebalance::getSubscriptionInner() {
return m_subscriptionData;
}
void Rebalance::setSubscriptionData(const string& topic, SubscriptionData* pdata) {
if (pdata != NULL && m_subscriptionData.find(topic) == m_subscriptionData.end())
m_subscriptionData[topic] = pdata;
}
void Rebalance::setTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs) {
if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
{
boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
if (m_topicSubscribeInfoTable.find(topic) != m_topicSubscribeInfoTable.end())
m_topicSubscribeInfoTable.erase(topic);
m_topicSubscribeInfoTable[topic] = mqs;
}
// log
vector<MQMessageQueue>::iterator it = mqs.begin();
for (; it != mqs.end(); ++it) {
LOG_DEBUG("topic [%s] has :%s", topic.c_str(), (*it).toString().c_str());
}
}
}
bool Rebalance::getTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs) {
boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
if (m_topicSubscribeInfoTable.find(topic) != m_topicSubscribeInfoTable.end()) {
mqs = m_topicSubscribeInfoTable[topic];
return true;
}
return false;
}
void Rebalance::addPullRequest(MQMessageQueue mq, boost::shared_ptr<PullRequest> pPullRequest) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
m_requestQueueTable[mq] = pPullRequest;
}
void Rebalance::removePullRequest(MQMessageQueue mq) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
m_requestQueueTable.erase(mq);
}
}
bool Rebalance::isPullRequestExist(MQMessageQueue mq) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
return true;
}
return false;
}
boost::weak_ptr<PullRequest> Rebalance::getPullRequest(MQMessageQueue mq) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
return m_requestQueueTable[mq];
}
return boost::weak_ptr<PullRequest>();
}
map<MQMessageQueue, boost::shared_ptr<PullRequest>> Rebalance::getPullRequestTable() {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
return m_requestQueueTable;
}
void Rebalance::unlockAll(bool oneWay) {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
if (!(it->second->isDropped())) {
if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
brokerMqs[it->first.getBrokerName()] = mqs;
} else {
brokerMqs[it->first.getBrokerName()]->push_back(it->first);
}
}
}
LOG_INFO("unLockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", itb->first.data());
continue;
}
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs(*(itb->second));
unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
unlockBatchRequest->setMqSet(mqs);
try {
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(),
1000, m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != mqs.size(); ++i) {
boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
if (!pullreq.expired()) {
LOG_INFO("unlockBatchMQ success of mq:%s", mqs[i].toString().c_str());
pullreq.lock()->setLocked(false);
} else {
LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
}
}
} catch (MQException& e) {
LOG_ERROR("unlockBatchMQ fails");
}
deleteAndZero(itb->second);
}
brokerMqs.clear();
}
void Rebalance::unlock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("unlock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
return;
}
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs;
mqs.push_back(mq);
unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
unlockBatchRequest->setMqSet(mqs);
try {
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000,
m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != mqs.size(); ++i) {
boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
if (!pullreq.expired()) {
LOG_INFO("unlock success of mq:%s", mqs[i].toString().c_str());
pullreq.lock()->setLocked(false);
} else {
LOG_ERROR("unlock fails of mq:%s", mqs[i].toString().c_str());
}
}
} catch (MQException& e) {
LOG_ERROR("unlock fails of mq:%s", mq.toString().c_str());
}
}
void Rebalance::lockAll() {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
if (!(it->second->isDropped())) {
string brokerKey = it->first.getBrokerName() + it->first.getTopic();
if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
brokerMqs[brokerKey] = mqs;
brokerMqs[brokerKey]->push_back(it->first);
} else {
brokerMqs[brokerKey]->push_back(it->first);
}
}
}
LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
string brokerName = (*(itb->second))[0].getBrokerName();
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
continue;
}
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
lockBatchRequest->setMqSet(*(itb->second));
LOG_INFO("try to lock:" SIZET_FMT " mqs of broker:%s", itb->second->size(), itb->first.c_str());
try {
vector<MQMessageQueue> messageQueues;
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
messageQueues, 1000, m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
if (!pullreq.expired()) {
LOG_INFO("lockBatchMQ success of mq:%s", messageQueues[i].toString().c_str());
pullreq.lock()->setLocked(true);
pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
} else {
LOG_ERROR("lockBatchMQ fails of mq:%s", messageQueues[i].toString().c_str());
}
}
messageQueues.clear();
} catch (MQException& e) {
LOG_ERROR("lockBatchMQ fails");
}
deleteAndZero(itb->second);
}
brokerMqs.clear();
}
bool Rebalance::lock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
return false;
}
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
vector<MQMessageQueue> in_mqSet;
in_mqSet.push_back(mq);
lockBatchRequest->setMqSet(in_mqSet);
bool lockResult = false;
try {
vector<MQMessageQueue> messageQueues;
LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
messageQueues, 1000, m_pConsumer->getSessionCredentials());
if (messageQueues.size() == 0) {
LOG_ERROR("lock mq on broker:%s failed", pFindBrokerResult->brokerAddr.c_str());
return false;
}
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
if (!pullreq.expired()) {
LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
pullreq.lock()->setLocked(true);
pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
lockResult = true;
} else {
LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
}
}
messageQueues.clear();
return lockResult;
} catch (MQException& e) {
LOG_ERROR("lock fails of mq:%s", mq.toString().c_str());
return false;
}
}
//<!************************************************************************
RebalancePull::RebalancePull(MQConsumer* consumer, MQClientFactory* pfactory) : Rebalance(consumer, pfactory) {}
bool RebalancePull::updateRequestTableInRebalance(const string& topic, vector<MQMessageQueue>& mqsSelf) {
return false;
}
int64 RebalancePull::computePullFromWhere(const MQMessageQueue& mq) {
return 0;
}
void RebalancePull::messageQueueChanged(const string& topic,
vector<MQMessageQueue>& mqAll,
vector<MQMessageQueue>& mqDivided) {}
void RebalancePull::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {}
//<!***************************************************************************
RebalancePush::RebalancePush(MQConsumer* consumer, MQClientFactory* pfactory) : Rebalance(consumer, pfactory) {}
bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQMessageQueue>& mqsSelf) {
LOG_DEBUG("updateRequestTableInRebalance for Topic[%s] Enter", topic.c_str());
// 1. Clear no in charge of
// 1. set dropped
// 2. clear local message
// 3. clear offset
// 4. remove request table
// 5. set flag for route changed
// 2. Check and clear dropped/invalid pullrequest(timeout and so on)
// 3. Add new mq in charge of
// 1. new pullrequest
// 2. init next pull offset
// 3. int offset
// 4. add request table
// 5. set flag for route changed
// 4. Start long pull for request
if (mqsSelf.empty()) {
LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
}
bool changed = false;
//<!remove none responsive mq
MQ2PULLREQ requestQueueTable(getPullRequestTable());
MQ2PULLREQ::iterator itDel = requestQueueTable.begin();
for (; itDel != requestQueueTable.end(); ++itDel) {
MQMessageQueue mqtemp = itDel->first;
if (mqtemp.getTopic().compare(topic) == 0) {
if (mqsSelf.empty() || (std::find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
// if not response , set to dropped
LOG_INFO("Drop mq:%s,because not responsive", mqtemp.toString().c_str());
itDel->second->setDropped(true);
// remove offset table to avoid offset backup
removeUnnecessaryMessageQueue(mqtemp);
itDel->second->clearAllMsgs();
removePullRequest(mqtemp);
changed = true;
} else if (itDel->second->isPullRequestExpired()) {
// if pull expired , set to dropped, eg: if add pull task error, the pull request will be expired.
LOG_INFO("Drop mq:%s according Pull timeout.", mqtemp.toString().c_str());
itDel->second->setDropped(true);
removeUnnecessaryMessageQueue(mqtemp);
itDel->second->clearAllMsgs();
removePullRequest(mqtemp);
changed = true;
}
}
}
//<!add check new mq added.
vector<boost::shared_ptr<PullRequest>> pullRequestsToAdd;
vector<MQMessageQueue>::iterator itAdd = mqsSelf.begin();
for (; itAdd != mqsSelf.end(); ++itAdd) {
if (isPullRequestExist(*itAdd)) {
// have check the expired pull request, re-add it.
continue;
}
boost::shared_ptr<PullRequest> pullRequest = boost::make_shared<PullRequest>(m_pConsumer->getGroupName());
pullRequest->m_messageQueue = *itAdd;
int64 nextOffset = computePullFromWhere(*itAdd);
if (nextOffset >= 0) {
pullRequest->setNextOffset(nextOffset);
changed = true;
addPullRequest(*itAdd, pullRequest);
pullRequestsToAdd.push_back(pullRequest);
LOG_INFO("Add mq:%s, request initial offset:%ld", (*itAdd).toString().c_str(), nextOffset);
} else {
LOG_WARN(
"Failed to add pull request for %s due to failure of querying consume offset, request initial offset:%ld",
(*itAdd).toString().c_str(), nextOffset);
}
}
for (vector<boost::shared_ptr<PullRequest>>::iterator itAdded = pullRequestsToAdd.begin();
itAdded != pullRequestsToAdd.end(); ++itAdded) {
LOG_INFO("Start to pull %s, offset:%ld, GroupName %s", (*itAdded)->m_messageQueue.toString().c_str(),
(*itAdded)->getNextOffset(), (*itAdded)->getGroupName().c_str());
if (!m_pConsumer->producePullMsgTask(*itAdded)) {
LOG_WARN(
"Failed to producer pull message task for %s, Remove it from Request table and wait for next #Rebalance.",
(*itAdded)->m_messageQueue.toString().c_str());
// remove from request table, and wait for next rebalance.
(*itAdded)->setDropped(true);
removePullRequest((*itAdded)->m_messageQueue);
}
}
LOG_DEBUG("updateRequestTableInRebalance Topic[%s] exit", topic.c_str());
return changed;
}
int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
int64 result = -1;
DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
if (!pConsumer) {
LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed when computePullFromWhere %s",
mq.toString().c_str());
return result;
}
ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
LOG_WARN("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is -1", mq.toString().c_str());
if (UtilAll::startsWith_retry(mq.getTopic())) {
LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is 0", mq.toString().c_str());
result = 0;
} else {
try {
result = pConsumer->maxOffset(mq);
LOG_INFO("CONSUME_FROM_LAST_OFFSET, maxOffset of mq:%s is:%lld", mq.toString().c_str(), result);
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
}
} else {
LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return 0", mq.toString().c_str());
result = 0;
} else {
LOG_ERROR("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return -1", mq.toString().c_str());
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_TIMESTAMP, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
if (UtilAll::startsWith_retry(mq.getTopic())) {
try {
result = pConsumer->maxOffset(mq);
LOG_INFO("CONSUME_FROM_TIMESTAMP, maxOffset of mq:%s is:%lld", mq.toString().c_str(), result);
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
} else {
try {
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return 0", mq.toString().c_str());
result = -1;
}
}
} else {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return -1", mq.toString().c_str());
result = -1;
}
break;
}
default:
break;
}
return result;
}
void RebalancePush::messageQueueChanged(const string& topic,
vector<MQMessageQueue>& mqAll,
vector<MQMessageQueue>& mqDivided) {}
void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
// DefaultMQPushConsumer *pConsumer = static_cast<DefaultMQPushConsumer *>(m_pConsumer);
DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
if (!pConsumer) {
LOG_ERROR("Cast MQConsumer* to DefaultMQPushConsumer* failed when remove %s", mq.toString().c_str());
return;
}
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
pOffsetStore->persist(mq, m_pConsumer->getSessionCredentials());
pOffsetStore->removeOffset(mq);
if (pConsumer->getMessageListenerType() == messageListenerOrderly) {
unlock(mq);
}
}
//<!************************************************************************
} // namespace rocketmq