blob: a2997a7144c1e5ec4750e9dc9dc1b3696c551597 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Rebalance.h"
#include "DefaultMQPushConsumer.h"
#include "LockBatchBody.h"
#include "Logging.h"
#include "MQClientAPIImpl.h"
#include "MQClientFactory.h"
#include "OffsetStore.h"
namespace rocketmq {
//<!************************************************************************
Rebalance::Rebalance(MQConsumer* consumer, MQClientFactory* pfactory)
: m_pConsumer(consumer), m_pClientFactory(pfactory) {
m_pAllocateMQStrategy = new AllocateMQAveragely();
}
Rebalance::~Rebalance() {
{
map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
for (; it != m_subscriptionData.end(); ++it)
deleteAndZero(it->second);
m_subscriptionData.clear();
}
{
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
delete it->second;
it->second = NULL;
}
m_requestQueueTable.clear();
}
m_topicSubscribeInfoTable.clear();
m_pConsumer = NULL;
m_pClientFactory = NULL;
deleteAndZero(m_pAllocateMQStrategy);
}
void Rebalance::doRebalance() {
LOG_DEBUG("start doRebalance");
try {
map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
for (; it != m_subscriptionData.end(); ++it) {
string topic = (it->first);
LOG_INFO("current topic is:%s", topic.c_str());
//<!topic -> mqs
vector<MQMessageQueue> mqAll;
if (!getTopicSubscribeInfo(topic, mqAll)) {
continue;
}
if (mqAll.empty()) {
if (!UtilAll::startsWith_retry(topic))
THROW_MQEXCEPTION(MQClientException, "doRebalance the topic is empty", -1);
}
//<!msg model;
switch (m_pConsumer->getMessageModel()) {
case BROADCASTING: {
bool changed = updateRequestTableInRebalance(topic, mqAll);
if (changed) {
messageQueueChanged(topic, mqAll, mqAll);
}
break;
}
case CLUSTERING: {
vector<string> cidAll;
m_pClientFactory->findConsumerIds(topic, m_pConsumer->getGroupName(), cidAll,
m_pConsumer->getSessionCredentials());
if (cidAll.empty()) {
/*remove the droping pullRequest changes for recovery consume fastly
from network broken
//drop all pullRequest
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it)
{
if(!(it->second->isDroped()))
{
MQMessageQueue mqtemp = it->first;
it->second->setDroped(true);
removeUnnecessaryMessageQueue(mqtemp);
it->second->clearAllMsgs();//add clear operation to
avoid bad
state when dropped pullRequest returns normal
LOG_INFO("find consumer failed, drop undropped mq:%s",
mqtemp.toString().c_str());
}
}*/
THROW_MQEXCEPTION(MQClientException, "doRebalance the cidAll is empty", -1);
}
// log
for (int i = 0; i < (int)cidAll.size(); ++i) {
LOG_INFO("client id:%s of topic:%s", cidAll[i].c_str(), topic.c_str());
}
//<! sort;
sort(mqAll.begin(), mqAll.end());
sort(cidAll.begin(), cidAll.end());
//<! allocate;
vector<MQMessageQueue> allocateResult;
try {
m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(), mqAll, cidAll, allocateResult);
} catch (MQException& e) {
THROW_MQEXCEPTION(MQClientException, "allocate error", -1);
}
// log
for (int i = 0; i < (int)allocateResult.size(); ++i) {
LOG_INFO("allocate mq:%s", allocateResult[i].toString().c_str());
}
//<!update local;
bool changed = updateRequestTableInRebalance(topic, allocateResult);
if (changed) {
messageQueueChanged(topic, mqAll, allocateResult);
break;
}
}
default:
break;
}
}
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
void Rebalance::persistConsumerOffset() {
DefaultMQPushConsumer* pConsumer = static_cast<DefaultMQPushConsumer*>(m_pConsumer);
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
vector<MQMessageQueue> mqs;
{
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
if (it->second && (!it->second->isDroped())) {
mqs.push_back(it->first);
}
}
}
if (pConsumer->getMessageModel() == BROADCASTING) {
pOffsetStore->persistAll(mqs);
} else {
vector<MQMessageQueue>::iterator it2 = mqs.begin();
for (; it2 != mqs.end(); ++it2) {
pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
}
}
}
void Rebalance::persistConsumerOffsetByResetOffset() {
DefaultMQPushConsumer* pConsumer = static_cast<DefaultMQPushConsumer*>(m_pConsumer);
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
vector<MQMessageQueue> mqs;
{
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
if (it->second) { // even if it was dropped, also need update offset when
// rcv resetOffset cmd
mqs.push_back(it->first);
}
}
}
vector<MQMessageQueue>::iterator it2 = mqs.begin();
for (; it2 != mqs.end(); ++it2) {
pOffsetStore->persist(*it2, m_pConsumer->getSessionCredentials());
}
}
SubscriptionData* Rebalance::getSubscriptionData(const string& topic) {
if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
return m_subscriptionData[topic];
}
return NULL;
}
map<string, SubscriptionData*>& Rebalance::getSubscriptionInner() {
return m_subscriptionData;
}
void Rebalance::setSubscriptionData(const string& topic, SubscriptionData* pdata) {
if (pdata != NULL && m_subscriptionData.find(topic) == m_subscriptionData.end())
m_subscriptionData[topic] = pdata;
}
void Rebalance::setTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs) {
if (m_subscriptionData.find(topic) != m_subscriptionData.end()) {
{
boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
if (m_topicSubscribeInfoTable.find(topic) != m_topicSubscribeInfoTable.end())
m_topicSubscribeInfoTable.erase(topic);
m_topicSubscribeInfoTable[topic] = mqs;
}
// log
vector<MQMessageQueue>::iterator it = mqs.begin();
for (; it != mqs.end(); ++it) {
LOG_DEBUG("topic [%s] has :%s", topic.c_str(), (*it).toString().c_str());
}
}
}
bool Rebalance::getTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs) {
boost::lock_guard<boost::mutex> lock(m_topicSubscribeInfoTableMutex);
if (m_topicSubscribeInfoTable.find(topic) != m_topicSubscribeInfoTable.end()) {
mqs = m_topicSubscribeInfoTable[topic];
return true;
}
return false;
}
void Rebalance::addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
m_requestQueueTable[mq] = pPullRequest;
}
PullRequest* Rebalance::getPullRequest(MQMessageQueue mq) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
return m_requestQueueTable[mq];
}
return NULL;
}
map<MQMessageQueue, PullRequest*> Rebalance::getPullRequestTable() {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
return m_requestQueueTable;
}
void Rebalance::unlockAll(bool oneway) {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
if (!(it->second->isDroped())) {
if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
brokerMqs[it->first.getBrokerName()] = mqs;
} else {
brokerMqs[it->first.getBrokerName()]->push_back(it->first);
}
}
}
LOG_INFO("unLockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", itb->first.data());
continue;
}
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs(*(itb->second));
unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
unlockBatchRequest->setMqSet(mqs);
try {
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(),
1000, m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != mqs.size(); ++i) {
PullRequest* pullreq = getPullRequest(mqs[i]);
if (pullreq) {
LOG_INFO("unlockBatchMQ success of mq:%s", mqs[i].toString().c_str());
pullreq->setLocked(true);
} else {
LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
}
}
} catch (MQException& e) {
LOG_ERROR("unlockBatchMQ fails");
}
deleteAndZero(itb->second);
}
brokerMqs.clear();
}
void Rebalance::unlock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("unlock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
return;
}
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs;
mqs.push_back(mq);
unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
unlockBatchRequest->setMqSet(mqs);
try {
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(), 1000,
m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != mqs.size(); ++i) {
PullRequest* pullreq = getPullRequest(mqs[i]);
if (pullreq) {
LOG_INFO("unlock success of mq:%s", mqs[i].toString().c_str());
pullreq->setLocked(true);
} else {
LOG_ERROR("unlock fails of mq:%s", mqs[i].toString().c_str());
}
}
} catch (MQException& e) {
LOG_ERROR("unlock fails of mq:%s", mq.toString().c_str());
}
}
void Rebalance::lockAll() {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
if (!(it->second->isDroped())) {
string brokerKey = it->first.getBrokerName() + it->first.getTopic();
if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
brokerMqs[brokerKey] = mqs;
brokerMqs[brokerKey]->push_back(it->first);
} else {
brokerMqs[brokerKey]->push_back(it->first);
}
}
}
LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
string brokerName = (*(itb->second))[0].getBrokerName();
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
continue;
}
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
lockBatchRequest->setMqSet(*(itb->second));
LOG_INFO("try to lock:" SIZET_FMT " mqs of broker:%s", itb->second->size(), itb->first.c_str());
try {
vector<MQMessageQueue> messageQueues;
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
messageQueues, 1000, m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
PullRequest* pullreq = getPullRequest(messageQueues[i]);
if (pullreq) {
LOG_INFO("lockBatchMQ success of mq:%s", messageQueues[i].toString().c_str());
pullreq->setLocked(true);
pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
} else {
LOG_ERROR("lockBatchMQ fails of mq:%s", messageQueues[i].toString().c_str());
}
}
messageQueues.clear();
} catch (MQException& e) {
LOG_ERROR("lockBatchMQ fails");
}
deleteAndZero(itb->second);
}
brokerMqs.clear();
}
bool Rebalance::lock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
return false;
}
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
vector<MQMessageQueue> in_mqSet;
in_mqSet.push_back(mq);
lockBatchRequest->setMqSet(in_mqSet);
bool lockResult = false;
try {
vector<MQMessageQueue> messageQueues;
LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
messageQueues, 1000, m_pConsumer->getSessionCredentials());
if (messageQueues.size() == 0) {
LOG_ERROR("lock mq on broker:%s failed", pFindBrokerResult->brokerAddr.c_str());
return false;
}
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
PullRequest* pullreq = getPullRequest(messageQueues[i]);
if (pullreq) {
LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
pullreq->setLocked(true);
pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
lockResult = true;
} else {
LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
}
}
messageQueues.clear();
return lockResult;
} catch (MQException& e) {
LOG_ERROR("lock fails of mq:%s", mq.toString().c_str());
return false;
}
}
//<!************************************************************************
RebalancePull::RebalancePull(MQConsumer* consumer, MQClientFactory* pfactory) : Rebalance(consumer, pfactory) {}
bool RebalancePull::updateRequestTableInRebalance(const string& topic, vector<MQMessageQueue>& mqsSelf) {
return false;
}
int64 RebalancePull::computePullFromWhere(const MQMessageQueue& mq) {
return 0;
}
void RebalancePull::messageQueueChanged(const string& topic,
vector<MQMessageQueue>& mqAll,
vector<MQMessageQueue>& mqDivided) {}
void RebalancePull::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {}
//<!***************************************************************************
RebalancePush::RebalancePush(MQConsumer* consumer, MQClientFactory* pfactory) : Rebalance(consumer, pfactory) {}
bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQMessageQueue>& mqsSelf) {
LOG_DEBUG("updateRequestTableInRebalance Enter");
if (mqsSelf.empty()) {
LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
}
bool changed = false;
//<!remove
MQ2PULLREQ requestQueueTable(getPullRequestTable());
MQ2PULLREQ::iterator it = requestQueueTable.begin();
for (; it != requestQueueTable.end(); ++it) {
MQMessageQueue mqtemp = it->first;
if (mqtemp.getTopic().compare(topic) == 0) {
if (mqsSelf.empty() || (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
if (!(it->second->isDroped())) {
it->second->setDroped(true);
// delete the lastest pull request for this mq, which hasn't been response
// m_pClientFactory->removeDropedPullRequestOpaque(it->second);
removeUnnecessaryMessageQueue(mqtemp);
it->second->clearAllMsgs(); // add clear operation to avoid bad state
// when dropped pullRequest returns
// normal
LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
}
changed = true;
}
}
}
//<!add
vector<PullRequest*> pullrequestAdd;
DefaultMQPushConsumer* pConsumer = static_cast<DefaultMQPushConsumer*>(m_pConsumer);
vector<MQMessageQueue>::iterator it2 = mqsSelf.begin();
for (; it2 != mqsSelf.end(); ++it2) {
PullRequest* pPullRequest(getPullRequest(*it2));
if (pPullRequest && pPullRequest->isDroped()) {
LOG_DEBUG(
"before resume the pull handle of this pullRequest, its mq is:%s, "
"its offset is:%lld",
(it2->toString()).c_str(), pPullRequest->getNextOffset());
pConsumer->getOffsetStore()->removeOffset(*it2); // remove dirty offset which maybe update to
// OffsetStore::m_offsetTable by consuming After last
// drop
int64 nextOffset = computePullFromWhere(*it2);
if (nextOffset >= 0) {
/*
Fix issue with following scenario:
1. pullRequest was dropped
2. the pullMsgEvent was not executed by taskQueue, so the PullMsgEvent
was not stop
3. pullReuest was resumed by next doRebalance, then mulitple
pullMsgEvent were produced for pullRequest
*/
bool bPullMsgEvent = pPullRequest->addPullMsgEvent();
while (!bPullMsgEvent) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(50));
LOG_INFO("pullRequest with mq :%s has unfinished pullMsgEvent", (it2->toString()).c_str());
bPullMsgEvent = pPullRequest->addPullMsgEvent();
}
pPullRequest->setDroped(false);
pPullRequest->clearAllMsgs(); // avoid consume accumulation and consume
// dumplication issues
pPullRequest->setNextOffset(nextOffset);
pPullRequest->updateQueueMaxOffset(nextOffset);
LOG_INFO(
"after resume the pull handle of this pullRequest, its mq is:%s, "
"its offset is:%lld",
(it2->toString()).c_str(), pPullRequest->getNextOffset());
changed = true;
pConsumer->producePullMsgTask(pPullRequest);
} else {
LOG_ERROR("get fatel error QueryOffset of mq:%s, do not reconsume this queue", (it2->toString()).c_str());
}
}
if (!pPullRequest) {
LOG_INFO("updateRequestTableInRebalance Doesn't find old mq");
PullRequest* pullRequest = new PullRequest(m_pConsumer->getGroupName());
pullRequest->m_messageQueue = *it2;
int64 nextOffset = computePullFromWhere(*it2);
if (nextOffset >= 0) {
pullRequest->setNextOffset(nextOffset);
pullRequest->clearAllMsgs(); // avoid consume accumulation and consume
// dumplication issues
changed = true;
//<! mq-> pq;
addPullRequest(*it2, pullRequest);
pullrequestAdd.push_back(pullRequest);
LOG_INFO("add mq:%s, request initiall offset:%lld", (*it2).toString().c_str(), nextOffset);
}
}
}
vector<PullRequest*>::iterator it3 = pullrequestAdd.begin();
for (; it3 != pullrequestAdd.end(); ++it3) {
LOG_DEBUG("start pull request");
pConsumer->producePullMsgTask(*it3);
}
LOG_DEBUG("updateRequestTableInRebalance exit");
return changed;
}
int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
int64 result = -1;
DefaultMQPushConsumer* pConsumer = static_cast<DefaultMQPushConsumer*>(m_pConsumer);
ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
LOG_WARN("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is -1", mq.toString().c_str());
if (UtilAll::startsWith_retry(mq.getTopic())) {
LOG_INFO("CONSUME_FROM_LAST_OFFSET, lastOffset of mq:%s is 0", mq.toString().c_str());
result = 0;
} else {
try {
result = pConsumer->maxOffset(mq);
LOG_INFO("CONSUME_FROM_LAST_OFFSET, maxOffset of mq:%s is:%lld", mq.toString().c_str(), result);
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
}
} else {
LOG_ERROR("CONSUME_FROM_LAST_OFFSET error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
LOG_INFO("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return 0", mq.toString().c_str());
result = 0;
} else {
LOG_ERROR("CONSUME_FROM_FIRST_OFFSET, lastOffset of mq:%s, return -1", mq.toString().c_str());
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
int64 lastOffset = pOffsetStore->readOffset(mq, READ_FROM_STORE, m_pConsumer->getSessionCredentials());
if (lastOffset >= 0) {
LOG_INFO("CONSUME_FROM_TIMESTAMP, lastOffset of mq:%s is:%lld", mq.toString().c_str(), lastOffset);
result = lastOffset;
} else if (-1 == lastOffset) {
if (UtilAll::startsWith_retry(mq.getTopic())) {
try {
result = pConsumer->maxOffset(mq);
LOG_INFO("CONSUME_FROM_TIMESTAMP, maxOffset of mq:%s is:%lld", mq.toString().c_str(), result);
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s is -1", mq.toString().c_str());
result = -1;
}
} else {
try {
} catch (MQException& e) {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return 0", mq.toString().c_str());
result = -1;
}
}
} else {
LOG_ERROR("CONSUME_FROM_TIMESTAMP error, lastOffset of mq:%s, return -1", mq.toString().c_str());
result = -1;
}
break;
}
default:
break;
}
return result;
}
void RebalancePush::messageQueueChanged(const string& topic,
vector<MQMessageQueue>& mqAll,
vector<MQMessageQueue>& mqDivided) {}
void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
DefaultMQPushConsumer* pConsumer = static_cast<DefaultMQPushConsumer*>(m_pConsumer);
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
pOffsetStore->persist(mq, m_pConsumer->getSessionCredentials());
pOffsetStore->removeOffset(mq);
if (pConsumer->getMessageListenerType() == messageListenerOrderly) {
unlock(mq);
}
}
//<!************************************************************************
} //<!end namespace;