blob: 46832dd5ffb42be1e2bbb43564f2dd539defc96c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PullAPIWrapper.h"
#include "CommunicationMode.h"
#include "MQClientFactory.h"
#include "PullResultExt.h"
#include "PullSysFlag.h"
namespace rocketmq {
//<!************************************************************************
PullAPIWrapper::PullAPIWrapper(MQClientFactory* mQClientFactory, const string& consumerGroup) {
m_MQClientFactory = mQClientFactory;
m_consumerGroup = consumerGroup;
}
PullAPIWrapper::~PullAPIWrapper() {
m_MQClientFactory = NULL;
m_pullFromWhichNodeTable.clear();
}
void PullAPIWrapper::updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId) {
boost::lock_guard<boost::mutex> lock(m_lock);
m_pullFromWhichNodeTable[mq] = brokerId;
}
int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) {
boost::lock_guard<boost::mutex> lock(m_lock);
if (m_pullFromWhichNodeTable.find(mq) != m_pullFromWhichNodeTable.end()) {
return m_pullFromWhichNodeTable[mq];
}
return MASTER_ID;
}
PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
PullResult* pullResult,
SubscriptionData* subscriptionData) {
PullResultExt* pResultExt = static_cast<PullResultExt*>(pullResult);
if (pResultExt == NULL) {
string errMsg("The pullResult NULL of");
errMsg.append(mq.toString());
THROW_MQEXCEPTION(MQClientException, errMsg, -1);
}
//<!update;
updatePullFromWhichNode(mq, pResultExt->suggestWhichBrokerId);
vector<MQMessageExt> msgFilterList;
if (pResultExt->pullStatus == FOUND) {
//<!decode all msg list;
vector<MQMessageExt> msgAllList;
MQDecoder::decodes(&pResultExt->msgMemBlock, msgAllList);
//<!filter msg list again;
if (subscriptionData != NULL && !subscriptionData->getTagsSet().empty()) {
msgFilterList.reserve(msgAllList.size());
vector<MQMessageExt>::iterator it = msgAllList.begin();
for (; it != msgAllList.end(); ++it) {
string msgTag = (*it).getTags();
if (subscriptionData->containTag(msgTag)) {
msgFilterList.push_back(*it);
}
}
} else {
msgFilterList.swap(msgAllList);
}
}
return PullResult(pResultExt->pullStatus, pResultExt->nextBeginOffset, pResultExt->minOffset, pResultExt->maxOffset,
msgFilterList);
}
PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq, // 1
string subExpression, // 2
int64 subVersion, // 3
int64 offset, // 4
int maxNums, // 5
int sysFlag, // 6
int64 commitOffset, // 7
int brokerSuspendMaxTimeMillis, // 8
int timeoutMillis, // 9
int communicationMode, // 10
PullCallback* pullCallback,
const SessionCredentials& session_credentials,
void* pArg /*= NULL*/) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), recalculatePullFromWhichNode(mq), false));
//<!goto nameserver;
if (pFindBrokerResult == NULL) {
m_MQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(), session_credentials);
pFindBrokerResult.reset(
m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), recalculatePullFromWhichNode(mq), false));
}
if (pFindBrokerResult != NULL) {
int sysFlagInner = sysFlag;
if (pFindBrokerResult->slave) {
sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader* pRequestHeader = new PullMessageRequestHeader();
pRequestHeader->consumerGroup = m_consumerGroup;
pRequestHeader->topic = mq.getTopic();
pRequestHeader->queueId = mq.getQueueId();
pRequestHeader->queueOffset = offset;
pRequestHeader->maxMsgNums = maxNums;
pRequestHeader->sysFlag = sysFlagInner;
pRequestHeader->commitOffset = commitOffset;
pRequestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis;
pRequestHeader->subscription = subExpression;
pRequestHeader->subVersion = subVersion;
return m_MQClientFactory->getMQClientAPIImpl()->pullMessage(pFindBrokerResult->brokerAddr, pRequestHeader,
timeoutMillis, communicationMode, pullCallback, pArg,
session_credentials);
}
THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
}
} //<!end namespace;