| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "PullAPIWrapper.h" |
| #include "CommunicationMode.h" |
| #include "MQClientFactory.h" |
| #include "PullResultExt.h" |
| #include "PullSysFlag.h" |
| namespace rocketmq { |
| //<!************************************************************************ |
| PullAPIWrapper::PullAPIWrapper(MQClientFactory* mQClientFactory, const string& consumerGroup) { |
| m_MQClientFactory = mQClientFactory; |
| m_consumerGroup = consumerGroup; |
| } |
| |
| PullAPIWrapper::~PullAPIWrapper() { |
| m_MQClientFactory = NULL; |
| m_pullFromWhichNodeTable.clear(); |
| } |
| |
| void PullAPIWrapper::updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId) { |
| boost::lock_guard<boost::mutex> lock(m_lock); |
| m_pullFromWhichNodeTable[mq] = brokerId; |
| } |
| |
| int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) { |
| boost::lock_guard<boost::mutex> lock(m_lock); |
| if (m_pullFromWhichNodeTable.find(mq) != m_pullFromWhichNodeTable.end()) { |
| return m_pullFromWhichNodeTable[mq]; |
| } |
| return MASTER_ID; |
| } |
| |
| PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq, |
| PullResult* pullResult, |
| SubscriptionData* subscriptionData) { |
| PullResultExt* pResultExt = static_cast<PullResultExt*>(pullResult); |
| if (pResultExt == NULL) { |
| string errMsg("The pullResult NULL of"); |
| errMsg.append(mq.toString()); |
| THROW_MQEXCEPTION(MQClientException, errMsg, -1); |
| } |
| |
| //<!update; |
| updatePullFromWhichNode(mq, pResultExt->suggestWhichBrokerId); |
| |
| vector<MQMessageExt> msgFilterList; |
| if (pResultExt->pullStatus == FOUND) { |
| //<!decode all msg list; |
| vector<MQMessageExt> msgAllList; |
| MQDecoder::decodes(&pResultExt->msgMemBlock, msgAllList); |
| |
| //<!filter msg list again; |
| if (subscriptionData != NULL && !subscriptionData->getTagsSet().empty()) { |
| msgFilterList.reserve(msgAllList.size()); |
| vector<MQMessageExt>::iterator it = msgAllList.begin(); |
| for (; it != msgAllList.end(); ++it) { |
| string msgTag = (*it).getTags(); |
| if (subscriptionData->containTag(msgTag)) { |
| msgFilterList.push_back(*it); |
| } |
| } |
| } else { |
| msgFilterList.swap(msgAllList); |
| } |
| } |
| |
| return PullResult(pResultExt->pullStatus, pResultExt->nextBeginOffset, pResultExt->minOffset, pResultExt->maxOffset, |
| msgFilterList); |
| } |
| |
| PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq, // 1 |
| const string& subExpression, // 2 |
| int64 subVersion, // 3 |
| int64 offset, // 4 |
| int maxNums, // 5 |
| int sysFlag, // 6 |
| int64 commitOffset, // 7 |
| int brokerSuspendMaxTimeMillis, // 8 |
| int timeoutMillis, // 9 |
| int communicationMode, // 10 |
| PullCallback* pullCallback, |
| const SessionCredentials& session_credentials, |
| void* pArg /*= NULL*/) { |
| unique_ptr<FindBrokerResult> pFindBrokerResult( |
| m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), recalculatePullFromWhichNode(mq), false)); |
| //<!goto nameserver; |
| if (pFindBrokerResult == NULL) { |
| m_MQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(), session_credentials); |
| pFindBrokerResult.reset( |
| m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), recalculatePullFromWhichNode(mq), false)); |
| } |
| |
| if (pFindBrokerResult != NULL) { |
| int sysFlagInner = sysFlag; |
| |
| if (pFindBrokerResult->slave) { |
| sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner); |
| } |
| |
| PullMessageRequestHeader* pRequestHeader = new PullMessageRequestHeader(); |
| pRequestHeader->consumerGroup = m_consumerGroup; |
| pRequestHeader->topic = mq.getTopic(); |
| pRequestHeader->queueId = mq.getQueueId(); |
| pRequestHeader->queueOffset = offset; |
| pRequestHeader->maxMsgNums = maxNums; |
| pRequestHeader->sysFlag = sysFlagInner; |
| pRequestHeader->commitOffset = commitOffset; |
| pRequestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis; |
| pRequestHeader->subscription = subExpression; |
| pRequestHeader->subVersion = subVersion; |
| |
| return m_MQClientFactory->getMQClientAPIImpl()->pullMessage(pFindBrokerResult->brokerAddr, pRequestHeader, |
| timeoutMillis, communicationMode, pullCallback, pArg, |
| session_credentials); |
| } |
| THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1); |
| } |
| |
| } // namespace rocketmq |