blob: e578840889918da9b64159c902bc7ca1f6d197d9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "PullRequest.h"
#include "Logging.h"
namespace rocketmq {
const uint64 PullRequest::RebalanceLockInterval = 20 * 1000;
const uint64 PullRequest::RebalanceLockMaxLiveTime = 30 * 1000;
* If the process queue has not been pulled for more than MAX_PULL_IDLE_TIME, we need to mark it as dropped
* default 120s
const uint64 PullRequest::MAX_PULL_IDLE_TIME = 120 * 1000;
PullRequest::PullRequest(const string& groupname)
: m_groupname(groupname), m_nextOffset(0), m_queueOffsetMax(0), m_bDropped(false), m_bLocked(false) {
m_lastLockTimestamp = UtilAll::currentTimeMillis();
m_lastPullTimestamp = UtilAll::currentTimeMillis();
m_lastConsumeTimestamp = UtilAll::currentTimeMillis();
PullRequest::~PullRequest() {
PullRequest& PullRequest::operator=(const PullRequest& other) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (this != &other) {
m_groupname = other.m_groupname;
m_nextOffset = other.m_nextOffset;;
m_queueOffsetMax = other.m_queueOffsetMax;
m_messageQueue = other.m_messageQueue;
m_msgTreeMap = other.m_msgTreeMap;
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
m_lastPullTimestamp = other.m_lastPullTimestamp;
m_lastConsumeTimestamp = other.m_lastConsumeTimestamp;
return *this;
void PullRequest::putMessage(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
vector<MQMessageExt>::iterator it = msgs.begin();
for (; it != msgs.end(); it++) {
m_msgTreeMap[it->getQueueOffset()] = *it;
m_queueOffsetMax = (std::max)(m_queueOffsetMax, it->getQueueOffset());
LOG_DEBUG("PullRequest: putMessage m_queueOffsetMax:%lld ", m_queueOffsetMax);
void PullRequest::getMessage(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
for (; it != m_msgTreeMap.end(); it++) {
int64 PullRequest::getCacheMinOffset() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (m_msgTreeMap.empty()) {
return 0;
} else {
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
MQMessageExt msg = it->second;
return msg.getQueueOffset();
int64 PullRequest::getCacheMaxOffset() {
return m_queueOffsetMax;
int PullRequest::getCacheMsgCount() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
return m_msgTreeMap.size();
void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset, int64 maxQueueOffset) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
int64 it = minQueueOffset;
for (; it <= maxQueueOffset; it++) {
int64 PullRequest::removeMessage(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
int64 result = -1;
LOG_DEBUG("m_queueOffsetMax is:%lld", m_queueOffsetMax);
if (!m_msgTreeMap.empty()) {
result = m_queueOffsetMax + 1;
LOG_DEBUG(" offset result is:%lld, m_queueOffsetMax is:%lld, msgs size:" SIZET_FMT "", result, m_queueOffsetMax,
vector<MQMessageExt>::iterator it = msgs.begin();
for (; it != msgs.end(); it++) {
LOG_DEBUG("remove these msg from m_msgTreeMap, its offset:%lld", it->getQueueOffset());
if (!m_msgTreeMap.empty()) {
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
result = it->first;
LOG_INFO("cache msg size:" SIZET_FMT " of pullRequest:%s, return offset result is:%lld", m_msgTreeMap.size(),
m_messageQueue.toString().c_str(), result);
return result;
void PullRequest::clearAllMsgs() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (isDropped()) {
LOG_DEBUG("clear m_msgTreeMap as PullRequest had been dropped.");
void PullRequest::updateQueueMaxOffset(int64 queueOffset) {
// following 2 cases which may set queueOffset smaller than m_queueOffsetMax:
// 1. resetOffset cmd
// 2. during rebalance, if configured with CONSUMER_FROM_FIRST_OFFSET, when
// readOffset called by computePullFromWhere was failed, m_nextOffset will be
// setted to 0
m_queueOffsetMax = queueOffset;
void PullRequest::setDropped(bool dropped) {
int temp = (dropped == true ? 1 : 0);;
m_queueOffsetMax = 0;
m_nextOffset = 0;
//the reason why not clear m_queueOffsetMax and m_nextOffset is due to
ConsumeMsgService and drop mq are concurrent running.
consider following situation:
1>. ConsumeMsgService running
2>. dorebalance, drop mq, reset m_nextOffset and m_queueOffsetMax
3>. ConsumeMsgService calls removeMessages, if no other msgs in
m_msgTreeMap, m_queueOffsetMax(0)+1 will return;
4>. updateOffset with 1, which is more smaller than correct offset.
bool PullRequest::isDropped() const {
return m_bDropped.load() == 1;
int64 PullRequest::getNextOffset() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
return m_nextOffset;
void PullRequest::setLocked(bool Locked) {
int temp = (Locked == true ? 1 : 0);;
bool PullRequest::isLocked() const {
return m_bLocked.load() == 1;
bool PullRequest::isLockExpired() const {
return (UtilAll::currentTimeMillis() - m_lastLockTimestamp) > RebalanceLockMaxLiveTime;
void PullRequest::setLastLockTimestamp(int64 time) {
m_lastLockTimestamp = time;
int64 PullRequest::getLastLockTimestamp() const {
return m_lastLockTimestamp;
void PullRequest::setLastPullTimestamp(uint64 time) {
m_lastPullTimestamp = time;
uint64 PullRequest::getLastPullTimestamp() const {
return m_lastPullTimestamp;
bool PullRequest::isPullRequestExpired() const {
uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
if (interval <= UtilAll::currentTimeMillis()) {
LOG_WARN("PullRequest for [%s] has been expired %lld ms,m_lastPullTimestamp = %lld ms",
m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - interval, m_lastPullTimestamp);
return true;
return false;
void PullRequest::setLastConsumeTimestamp(uint64 time) {
m_lastConsumeTimestamp = time;
uint64 PullRequest::getLastConsumeTimestamp() const {
return m_lastConsumeTimestamp;
void PullRequest::setTryUnlockTimes(int time) {
m_lastLockTimestamp = time;
int PullRequest::getTryUnlockTimes() const {
return m_lastLockTimestamp;
void PullRequest::setNextOffset(int64 nextoffset) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
m_nextOffset = nextoffset;
string PullRequest::getGroupName() const {
return m_groupname;
boost::timed_mutex& PullRequest::getPullRequestCriticalSection() {
return m_consumeLock;
void PullRequest::takeMessages(vector<MQMessageExt>& msgs, int batchSize) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
for (int i = 0; i != batchSize; i++) {
map<int64, MQMessageExt>::iterator it = m_msgTreeMap.begin();
if (it != m_msgTreeMap.end()) {
m_msgTreeMapTemp[it->first] = it->second;
void PullRequest::makeMessageToCosumeAgain(vector<MQMessageExt>& msgs) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
for (unsigned int it = 0; it != msgs.size(); ++it) {
m_msgTreeMap[msgs[it].getQueueOffset()] = msgs[it];
int64 PullRequest::commit() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
if (!m_msgTreeMapTemp.empty()) {
int64 offset = (--m_msgTreeMapTemp.end())->first;
return offset + 1;
} else {
return -1;
} // namespace rocketmq