blob: c4145769af1e3ca82b5da72a69fdb4d5135e03be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __PULLREQUEST_H__
#define __PULLREQUEST_H__
#include <boost/atomic.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include "MQMessageExt.h"
#include "MQMessageQueue.h"
#include "UtilAll.h"
namespace rocketmq {
//<!***************************************************************************
class PullRequest {
public:
PullRequest(const string& groupname);
virtual ~PullRequest();
void putMessage(vector<MQMessageExt>& msgs);
void getMessage(vector<MQMessageExt>& msgs);
int64 getCacheMinOffset();
int64 getCacheMaxOffset();
int getCacheMsgCount();
void getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset, int64 maxQueueOffset);
int64 removeMessage(vector<MQMessageExt>& msgs);
void clearAllMsgs();
PullRequest& operator=(const PullRequest& other);
void setDropped(bool dropped);
bool isDropped() const;
int64 getNextOffset();
void setNextOffset(int64 nextoffset);
string getGroupName() const;
void updateQueueMaxOffset(int64 queueOffset);
void setLocked(bool Locked);
bool isLocked() const;
bool isLockExpired() const;
void setLastLockTimestamp(int64 time);
int64 getLastLockTimestamp() const;
void setLastPullTimestamp(uint64 time);
uint64 getLastPullTimestamp() const;
bool isPullRequestExpired() const;
void setLastConsumeTimestamp(uint64 time);
uint64 getLastConsumeTimestamp() const;
void setTryUnlockTimes(int time);
int getTryUnlockTimes() const;
void takeMessages(vector<MQMessageExt>& msgs, int batchSize);
int64 commit();
void makeMessageToCosumeAgain(vector<MQMessageExt>& msgs);
boost::timed_mutex& getPullRequestCriticalSection();
bool removePullMsgEvent(bool force = false);
bool addPullMsgEvent();
/**
* Check if there is an in-flight pull request.
*/
bool hasInFlightPullRequest() const;
public:
MQMessageQueue m_messageQueue;
static const uint64 RebalanceLockInterval; // ms
static const uint64 RebalanceLockMaxLiveTime; // ms
static const uint64 MAX_PULL_IDLE_TIME; // ms
private:
string m_groupname;
int64 m_nextOffset;
int64 m_queueOffsetMax;
boost::atomic<bool> m_bDropped;
boost::atomic<bool> m_bLocked;
map<int64, MQMessageExt> m_msgTreeMap;
map<int64, MQMessageExt> m_msgTreeMapTemp;
boost::mutex m_pullRequestLock;
uint64 m_lastLockTimestamp; // ms
// uint64 m_tryUnlockTimes;
uint64 m_lastPullTimestamp;
uint64 m_lastConsumeTimestamp;
boost::timed_mutex m_consumeLock;
};
//<!************************************************************************
} // namespace rocketmq
#endif