blob: e578840889918da9b64159c902bc7ca1f6d197d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PullRequest.h"
#include "Logging.h"
namespace rocketmq {
//<!***************************************************************************
const uint64 PullRequest::RebalanceLockInterval = 20 * 1000;
const uint64 PullRequest::RebalanceLockMaxLiveTime = 30 * 1000;
/**
* If the process queue has not been pulled for more than MAX_PULL_IDLE_TIME, we need to mark it as dropped
* default 120s
*/
const uint64 PullRequest::MAX_PULL_IDLE_TIME = 120 * 1000;
PullRequest::PullRequest(const string& groupname)
: m_groupname(groupname), m_nextOffset(0), m_queueOffsetMax(0), m_bDropped(false), m_bLocked(false) {
m_lastLockTimestamp = UtilAll::currentTimeMillis();
m_lastPullTimestamp = UtilAll::currentTimeMillis();
m_lastConsumeTimestamp = UtilAll::currentTimeMillis();
}
PullRequest::~PullRequest() {
m_msgTreeMapTemp.clear();
m_msgTreeMap.clear();
}
PullRequest& PullRequest::operator=(const PullRequest& other) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (this != &other) {
m_groupname = other.m_groupname;
m_nextOffset = other.m_nextOffset;
m_bDropped.store(other.m_bDropped.load());
m_queueOffsetMax = other.m_queueOffsetMax;
m_messageQueue = other.m_messageQueue;
m_msgTreeMap = other.m_msgTreeMap;
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
m_lastPullTimestamp = other.m_lastPullTimestamp;
m_lastConsumeTimestamp = other.m_lastConsumeTimestamp;
}
return *this;
}
void PullRequest::putMessage(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
vector<MQMessageExt>::iterator it = msgs.begin();
for (; it != msgs.end(); it++) {
m_msgTreeMap[it->getQueueOffset()] = *it;
m_queueOffsetMax = (std::max)(m_queueOffsetMax, it->getQueueOffset());
}
LOG_DEBUG("PullRequest: putMessage m_queueOffsetMax:%lld ", m_queueOffsetMax);
}
void PullRequest::getMessage(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
for (; it != m_msgTreeMap.end(); it++) {
msgs.push_back(it->second);
}
}
int64 PullRequest::getCacheMinOffset() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (m_msgTreeMap.empty()) {
return 0;
} else {
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
MQMessageExt msg = it->second;
return msg.getQueueOffset();
}
}
int64 PullRequest::getCacheMaxOffset() {
return m_queueOffsetMax;
}
int PullRequest::getCacheMsgCount() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
return m_msgTreeMap.size();
}
void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset, int64 maxQueueOffset) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
int64 it = minQueueOffset;
for (; it <= maxQueueOffset; it++) {
msgs.push_back(m_msgTreeMap[it]);
}
}
int64 PullRequest::removeMessage(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
int64 result = -1;
LOG_DEBUG("m_queueOffsetMax is:%lld", m_queueOffsetMax);
if (!m_msgTreeMap.empty()) {
result = m_queueOffsetMax + 1;
LOG_DEBUG(" offset result is:%lld, m_queueOffsetMax is:%lld, msgs size:" SIZET_FMT "", result, m_queueOffsetMax,
msgs.size());
vector<MQMessageExt>::iterator it = msgs.begin();
for (; it != msgs.end(); it++) {
LOG_DEBUG("remove these msg from m_msgTreeMap, its offset:%lld", it->getQueueOffset());
m_msgTreeMap.erase(it->getQueueOffset());
}
if (!m_msgTreeMap.empty()) {
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
result = it->first;
LOG_INFO("cache msg size:" SIZET_FMT " of pullRequest:%s, return offset result is:%lld", m_msgTreeMap.size(),
m_messageQueue.toString().c_str(), result);
}
}
return result;
}
void PullRequest::clearAllMsgs() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (isDropped()) {
LOG_DEBUG("clear m_msgTreeMap as PullRequest had been dropped.");
m_msgTreeMap.clear();
m_msgTreeMapTemp.clear();
}
}
void PullRequest::updateQueueMaxOffset(int64 queueOffset) {
// following 2 cases which may set queueOffset smaller than m_queueOffsetMax:
// 1. resetOffset cmd
// 2. during rebalance, if configured with CONSUMER_FROM_FIRST_OFFSET, when
// readOffset called by computePullFromWhere was failed, m_nextOffset will be
// setted to 0
m_queueOffsetMax = queueOffset;
}
void PullRequest::setDropped(bool dropped) {
int temp = (dropped == true ? 1 : 0);
m_bDropped.store(temp);
/*
m_queueOffsetMax = 0;
m_nextOffset = 0;
//the reason why not clear m_queueOffsetMax and m_nextOffset is due to
ConsumeMsgService and drop mq are concurrent running.
consider following situation:
1>. ConsumeMsgService running
2>. dorebalance, drop mq, reset m_nextOffset and m_queueOffsetMax
3>. ConsumeMsgService calls removeMessages, if no other msgs in
m_msgTreeMap, m_queueOffsetMax(0)+1 will return;
4>. updateOffset with 1, which is more smaller than correct offset.
*/
}
bool PullRequest::isDropped() const {
return m_bDropped.load() == 1;
}
int64 PullRequest::getNextOffset() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
return m_nextOffset;
}
void PullRequest::setLocked(bool Locked) {
int temp = (Locked == true ? 1 : 0);
m_bLocked.store(temp);
}
bool PullRequest::isLocked() const {
return m_bLocked.load() == 1;
}
bool PullRequest::isLockExpired() const {
return (UtilAll::currentTimeMillis() - m_lastLockTimestamp) > RebalanceLockMaxLiveTime;
}
void PullRequest::setLastLockTimestamp(int64 time) {
m_lastLockTimestamp = time;
}
int64 PullRequest::getLastLockTimestamp() const {
return m_lastLockTimestamp;
}
void PullRequest::setLastPullTimestamp(uint64 time) {
m_lastPullTimestamp = time;
}
uint64 PullRequest::getLastPullTimestamp() const {
return m_lastPullTimestamp;
}
bool PullRequest::isPullRequestExpired() const {
uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
if (interval <= UtilAll::currentTimeMillis()) {
LOG_WARN("PullRequest for [%s] has been expired %lld ms,m_lastPullTimestamp = %lld ms",
m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - interval, m_lastPullTimestamp);
return true;
}
return false;
}
void PullRequest::setLastConsumeTimestamp(uint64 time) {
m_lastConsumeTimestamp = time;
}
uint64 PullRequest::getLastConsumeTimestamp() const {
return m_lastConsumeTimestamp;
}
void PullRequest::setTryUnlockTimes(int time) {
m_lastLockTimestamp = time;
}
int PullRequest::getTryUnlockTimes() const {
return m_lastLockTimestamp;
}
void PullRequest::setNextOffset(int64 nextoffset) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
m_nextOffset = nextoffset;
}
string PullRequest::getGroupName() const {
return m_groupname;
}
boost::timed_mutex& PullRequest::getPullRequestCriticalSection() {
return m_consumeLock;
}
void PullRequest::takeMessages(vector<MQMessageExt>& msgs, int batchSize) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
for (int i = 0; i != batchSize; i++) {
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
if (it != m_msgTreeMap.end()) {
msgs.push_back(it->second);
m_msgTreeMapTemp[it->first] = it->second;
m_msgTreeMap.erase(it);
}
}
}
void PullRequest::makeMessageToCosumeAgain(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
for (unsigned int it = 0; it != msgs.size(); ++it) {
m_msgTreeMap[msgs[it].getQueueOffset()] = msgs[it];
m_msgTreeMapTemp.erase(msgs[it].getQueueOffset());
}
}
int64 PullRequest::commit() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (!m_msgTreeMapTemp.empty()) {
int64 offset = (--m_msgTreeMapTemp.end())->first;
m_msgTreeMapTemp.clear();
return offset + 1;
} else {
return -1;
}
}
//<!***************************************************************************
} // namespace rocketmq