blob: c572535087b5e79039cba0f34d90a10ac886990b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __TOPICPUBLISHINFO_H__
#define __TOPICPUBLISHINFO_H__
#include <boost/asio.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/atomic.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <map>
#include <string>
#include <vector>
#include "Logging.h"
#include "MQMessageQueue.h"
#include "UtilAll.h"
namespace rocketmq {
//<!************************************************************************/
class TopicPublishInfo {
public:
TopicPublishInfo() : m_sendWhichQueue(0) {
m_async_service_thread.reset(new boost::thread(boost::bind(&TopicPublishInfo::boost_asio_work, this)));
}
void boost_asio_work() {
boost::asio::io_service::work work(m_async_ioService); // avoid async io
// service stops
// after first timer
// timeout callback
boost::system::error_code e;
boost::asio::deadline_timer t(m_async_ioService, boost::posix_time::seconds(60));
t.async_wait(boost::bind(&TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, &t));
boost::system::error_code ec;
m_async_ioService.run(ec);
}
virtual ~TopicPublishInfo() {
m_async_ioService.stop();
m_async_service_thread->interrupt();
m_async_service_thread->join();
m_nonSerivceQueues.clear();
m_onSerivceQueues.clear();
m_brokerTimerMap.clear();
m_queues.clear();
}
bool ok() {
boost::lock_guard<boost::mutex> lock(m_queuelock);
return !m_queues.empty();
}
void updateMessageQueueList(const MQMessageQueue& mq) {
boost::lock_guard<boost::mutex> lock(m_queuelock);
m_queues.push_back(mq);
string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
m_onSerivceQueues[key] = mq;
if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
m_nonSerivceQueues.erase(key); // if topicPublishInfo changed, erase this
// mq from m_nonSerivceQueues to avoid 2
// copies both in m_onSerivceQueues and
// m_nonSerivceQueues
}
}
void op_resumeNonServiceMessageQueueList(boost::system::error_code& ec, boost::asio::deadline_timer* t) {
resumeNonServiceMessageQueueList();
boost::system::error_code e;
t->expires_from_now(t->expires_from_now() + boost::posix_time::seconds(60), e);
t->async_wait(boost::bind(&TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, t));
}
void resumeNonServiceMessageQueueList() {
boost::lock_guard<boost::mutex> lock(m_queuelock);
for (map<MQMessageQueue, int64>::iterator it = m_brokerTimerMap.begin(); it != m_brokerTimerMap.end(); ++it) {
if (UtilAll::currentTimeMillis() - it->second >= 1000 * 60 * 5) {
string key = it->first.getBrokerName() + UtilAll::to_string(it->first.getQueueId());
if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
m_nonSerivceQueues.erase(key);
}
m_onSerivceQueues[key] = it->first;
}
}
}
void updateNonServiceMessageQueue(const MQMessageQueue& mq, int timeoutMilliseconds) {
boost::lock_guard<boost::mutex> lock(m_queuelock);
string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
return;
}
LOG_INFO("updateNonServiceMessageQueue of mq:%s", mq.toString().c_str());
m_brokerTimerMap[mq] = UtilAll::currentTimeMillis();
m_nonSerivceQueues[key] = mq;
if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
m_onSerivceQueues.erase(key);
}
}
vector<MQMessageQueue>& getMessageQueueList() {
boost::lock_guard<boost::mutex> lock(m_queuelock);
return m_queues;
}
int getWhichQueue() { return m_sendWhichQueue.load(boost::memory_order_acquire); }
MQMessageQueue selectOneMessageQueue(const MQMessageQueue& lastmq, int& mq_index) {
boost::lock_guard<boost::mutex> lock(m_queuelock);
if (m_queues.size() > 0) {
LOG_DEBUG("selectOneMessageQueue Enter, queue size:" SIZET_FMT "", m_queues.size());
unsigned int pos = 0;
if (mq_index >= 0) {
pos = mq_index % m_queues.size();
} else {
LOG_ERROR("mq_index is negative");
return MQMessageQueue();
}
if (!lastmq.getBrokerName().empty()) {
for (size_t i = 0; i < m_queues.size(); i++) {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
if (pos >= m_queues.size())
pos = pos % m_queues.size();
++m_sendWhichQueue;
MQMessageQueue mq = m_queues.at(pos);
LOG_DEBUG("lastmq broker not empty, m_sendWhichQueue:%d, pos:%d",
m_sendWhichQueue.load(boost::memory_order_acquire), pos);
if (mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) {
mq_index = pos;
return mq;
}
++pos;
}
LOG_ERROR("could not find property mq");
return MQMessageQueue();
} else {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
++m_sendWhichQueue;
LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
m_sendWhichQueue.load(boost::memory_order_acquire), pos);
mq_index = pos;
return m_queues.at(pos);
}
} else {
LOG_ERROR("m_queues empty");
return MQMessageQueue();
}
}
MQMessageQueue selectOneActiveMessageQueue(const MQMessageQueue& lastmq, int& mq_index) {
boost::lock_guard<boost::mutex> lock(m_queuelock);
if (m_queues.size() > 0) {
unsigned int pos = 0;
if (mq_index >= 0) {
pos = mq_index % m_queues.size();
} else {
LOG_ERROR("mq_index is negative");
return MQMessageQueue();
}
if (!lastmq.getBrokerName().empty()) {
for (size_t i = 0; i < m_queues.size(); i++) {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
if (pos >= m_queues.size())
pos = pos % m_queues.size();
++m_sendWhichQueue;
MQMessageQueue mq = m_queues.at(pos);
string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
if ((mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) &&
(m_onSerivceQueues.find(key) != m_onSerivceQueues.end())) {
mq_index = pos;
return mq;
}
++pos;
}
for (MQMAP::iterator it = m_nonSerivceQueues.begin(); it != m_nonSerivceQueues.end();
++it) { // if no MQMessageQueue(except lastmq) in
// m_onSerivceQueues, search m_nonSerivceQueues
if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
return it->second;
}
LOG_ERROR("can not find property mq");
return MQMessageQueue();
} else {
for (size_t i = 0; i < m_queues.size(); i++) {
if (m_sendWhichQueue.load(boost::memory_order_acquire) == (numeric_limits<int>::max)()) {
m_sendWhichQueue.store(0, boost::memory_order_release);
}
if (pos >= m_queues.size())
pos = pos % m_queues.size();
++m_sendWhichQueue;
LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
m_sendWhichQueue.load(boost::memory_order_acquire), pos);
mq_index = pos;
MQMessageQueue mq = m_queues.at(pos);
string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
return mq;
} else {
++pos;
}
}
for (MQMAP::iterator it = m_nonSerivceQueues.begin(); it != m_nonSerivceQueues.end();
++it) { // if no MQMessageQueue(except lastmq) in
// m_onSerivceQueues, search m_nonSerivceQueues
if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
return it->second;
}
LOG_ERROR("can not find property mq");
return MQMessageQueue();
}
} else {
LOG_ERROR("m_queues empty");
return MQMessageQueue();
}
}
private:
boost::mutex m_queuelock;
typedef vector<MQMessageQueue> QueuesVec;
QueuesVec m_queues;
typedef map<string, MQMessageQueue> MQMAP;
MQMAP m_onSerivceQueues;
MQMAP m_nonSerivceQueues;
boost::atomic<int> m_sendWhichQueue;
map<MQMessageQueue, int64> m_brokerTimerMap;
boost::asio::io_service m_async_ioService;
boost::scoped_ptr<boost::thread> m_async_service_thread;
};
//<!***************************************************************************
} // namespace rocketmq
#endif