Consume message from slave if master is down. (#105)
merged from MQClientInstance#findBrokerAddrByTopic, BrokerData#selectBrokerAddr and MQClientInstance#findBrokerAddressInSubscribe in Java client.
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 4eccf34..2dd7bdb 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -631,35 +631,38 @@
return "";
}
-FindBrokerResult* MQClientFactory::findBrokerAddressInSubscribe(
- const string& brokerName, int brokerId, bool onlyThisBroker) {
- string brokerAddr;
- bool slave = false;
- bool found = false;
- BrokerAddrMAP brokerTable(getBrokerAddrMap());
+FindBrokerResult *MQClientFactory::findBrokerAddressInSubscribe(const string &brokerName,
+ int brokerId,
+ bool onlyThisBroker) {
+ string brokerAddr;
+ bool slave = false;
+ bool found = false;
+ BrokerAddrMAP brokerTable(getBrokerAddrMap());
- if (brokerTable.find(brokerName) != brokerTable.end()) {
- map<int, string> brokerMap(brokerTable[brokerName]);
- map<int, string>::iterator it1 = brokerMap.find(brokerId);
- if (it1 != brokerMap.end()) {
- brokerAddr = it1->second;
- slave = (brokerId != MASTER_ID);
- found = true;
- } else // from master
- {
- it1 = brokerMap.find(MASTER_ID);
- if (it1 != brokerMap.end()) {
- brokerAddr = it1->second;
- slave = false;
- found = true;
- }
+ if (brokerTable.find(brokerName) != brokerTable.end()) {
+ map<int, string> brokerMap(brokerTable[brokerName]);
+ if (!brokerMap.empty()) {
+ auto iter = brokerMap.find(brokerId);
+ if (iter != brokerMap.end()) {
+ brokerAddr = iter->second;
+ slave = (brokerId != MASTER_ID);
+ found = true;
+ } else if (!onlyThisBroker) { // not only from master
+ iter = brokerMap.begin();
+ brokerAddr = iter->second;
+ slave = iter->first != MASTER_ID;
+ found = true;
+ }
+ }
}
- }
- brokerTable.clear();
- if (found) return new FindBrokerResult(brokerAddr, slave);
+ brokerTable.clear();
- return NULL;
+ if (found) {
+ return new FindBrokerResult(brokerAddr, slave);
+ }
+
+ return nullptr;
}
FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(
diff --git a/src/protocol/TopicRouteData.h b/src/protocol/TopicRouteData.h
index ec8f842..9f70698 100755
--- a/src/protocol/TopicRouteData.h
+++ b/src/protocol/TopicRouteData.h
@@ -17,161 +17,147 @@
#ifndef __TOPICROUTEDATA_H__
#define __TOPICROUTEDATA_H__
#include <algorithm>
+#include <cstdlib>
#include "Logging.h"
#include "UtilAll.h"
#include "dataBlock.h"
#include "json/json.h"
namespace rocketmq {
+
//<!***************************************************************************
struct QueueData {
- string brokerName;
- int readQueueNums;
- int writeQueueNums;
- int perm;
+ std::string brokerName;
+ int readQueueNums;
+ int writeQueueNums;
+ int perm;
- bool operator<(const QueueData& other) const {
- return brokerName < other.brokerName;
- }
+ bool operator<(const QueueData &other) const { return brokerName < other.brokerName; }
- bool operator==(const QueueData& other) const {
- if (brokerName == other.brokerName &&
- readQueueNums == other.readQueueNums &&
- writeQueueNums == other.writeQueueNums && perm == other.perm) {
- return true;
+ bool operator==(const QueueData &other) const {
+ return brokerName == other.brokerName && readQueueNums == other.readQueueNums &&
+ writeQueueNums == other.writeQueueNums && perm == other.perm;
}
- return false;
- }
};
//<!***************************************************************************
struct BrokerData {
- string brokerName;
- map<int, string> brokerAddrs; //<!0:master,1,2.. slave
+ std::string brokerName;
+ std::map<int, string> brokerAddrs; //<!0:master,1,2.. slave
- bool operator<(const BrokerData& other) const {
- return brokerName < other.brokerName;
- }
+ bool operator<(const BrokerData &other) const { return brokerName < other.brokerName; }
- bool operator==(const BrokerData& other) const {
- if (brokerName == other.brokerName && brokerAddrs == other.brokerAddrs) {
- return true;
+ bool operator==(const BrokerData &other) const {
+ return brokerName == other.brokerName && brokerAddrs == other.brokerAddrs;
}
- return false;
- }
};
//<!************************************************************************/
class TopicRouteData {
- public:
- virtual ~TopicRouteData() {
- m_brokerDatas.clear();
- m_queueDatas.clear();
- }
-
- static TopicRouteData* Decode(const MemoryBlock* mem) {
- //<!see doc/TopicRouteData.json;
- const char* const pData = static_cast<const char*>(mem->getData());
- string data(pData, mem->getSize());
-
- Json::Value root;
- Json::CharReaderBuilder charReaderBuilder;
- charReaderBuilder.settings_["allowNumericKeys"] = true;
- unique_ptr<Json::CharReader> pCharReaderPtr(charReaderBuilder.newCharReader());
- const char* begin = pData;
- const char* end = pData + mem->getSize();
- string errs;
- if (!pCharReaderPtr->parse(begin, end, &root, &errs)) {
- LOG_ERROR("parse json error:%s, value isArray:%d, isObject:%d", errs.c_str(), root.isArray(), root.isObject());
- return NULL;
+ public:
+ virtual ~TopicRouteData() {
+ m_brokerDatas.clear();
+ m_queueDatas.clear();
}
- TopicRouteData* trd = new TopicRouteData();
- trd->setOrderTopicConf(root["orderTopicConf"].asString());
+ static TopicRouteData *Decode(const MemoryBlock *mem) {
+ //<!see doc/TopicRouteData.json;
+ const char *const pData = static_cast<const char *>(mem->getData());
+ string data(pData, mem->getSize());
- Json::Value qds = root["queueDatas"];
- for (unsigned int i = 0; i < qds.size(); i++) {
- QueueData d;
- Json::Value qd = qds[i];
- d.brokerName = qd["brokerName"].asString();
- d.readQueueNums = qd["readQueueNums"].asInt();
- d.writeQueueNums = qd["writeQueueNums"].asInt();
- d.perm = qd["perm"].asInt();
+ Json::CharReaderBuilder charReaderBuilder;
+ charReaderBuilder.settings_["allowNumericKeys"] = true;
+ unique_ptr<Json::CharReader> pCharReaderPtr(charReaderBuilder.newCharReader());
- trd->getQueueDatas().push_back(d);
+ const char *begin = pData;
+ const char *end = pData + mem->getSize();
+ Json::Value root;
+ string errs;
+
+ if (!pCharReaderPtr->parse(begin, end, &root, &errs)) {
+ LOG_ERROR("parse json error:%s, value isArray:%d, isObject:%d", errs.c_str(), root.isArray(),
+ root.isObject());
+ return nullptr;
+ }
+
+ auto *trd = new TopicRouteData();
+ trd->setOrderTopicConf(root["orderTopicConf"].asString());
+
+ Json::Value qds = root["queueDatas"];
+ for (auto qd : qds) {
+ QueueData d;
+ d.brokerName = qd["brokerName"].asString();
+ d.readQueueNums = qd["readQueueNums"].asInt();
+ d.writeQueueNums = qd["writeQueueNums"].asInt();
+ d.perm = qd["perm"].asInt();
+ trd->getQueueDatas().push_back(d);
+ }
+ sort(trd->getQueueDatas().begin(), trd->getQueueDatas().end());
+
+ Json::Value bds = root["brokerDatas"];
+ for (auto bd : bds) {
+ BrokerData d;
+ d.brokerName = bd["brokerName"].asString();
+ LOG_DEBUG("brokerName:%s", d.brokerName.c_str());
+ Json::Value bas = bd["brokerAddrs"];
+ Json::Value::Members mbs = bas.getMemberNames();
+ for (const auto &key : mbs) {
+ int id = atoi(key.c_str());
+ string addr = bas[key].asString();
+ d.brokerAddrs[id] = addr;
+ LOG_DEBUG("brokerId:%d, brokerAddr:%s", id, addr.c_str());
+ }
+ trd->getBrokerDatas().push_back(d);
+ }
+ sort(trd->getBrokerDatas().begin(), trd->getBrokerDatas().end());
+
+ return trd;
}
- sort(trd->getQueueDatas().begin(), trd->getQueueDatas().end());
-
- Json::Value bds = root["brokerDatas"];
- for (unsigned int i = 0; i < bds.size(); i++) {
- BrokerData d;
- Json::Value bd = bds[i];
- d.brokerName = bd["brokerName"].asString();
-
- LOG_DEBUG("brokerName:%s", d.brokerName.c_str());
-
- Json::Value bas = bd["brokerAddrs"];
- Json::Value::Members mbs = bas.getMemberNames();
- for (size_t i = 0; i < mbs.size(); i++) {
- string key = mbs.at(i);
- LOG_DEBUG("brokerid:%s,brokerAddr:%s", key.c_str(),
- bas[key].asString().c_str());
- d.brokerAddrs[atoi(key.c_str())] = bas[key].asString();
- }
-
- trd->getBrokerDatas().push_back(d);
+ /**
+ * Selects a (preferably master) broker address from the registered list.
+ * If the master's address cannot be found, a slave broker address is selected in a random manner.
+ *
+ * @return Broker address.
+ */
+ std::string selectBrokerAddr() {
+ int bdSize = m_brokerDatas.size();
+ if (bdSize > 0) {
+ int bdIndex = std::rand() % bdSize;
+ auto bd = m_brokerDatas[bdIndex];
+ auto iter = bd.brokerAddrs.find(MASTER_ID);
+ if (iter == bd.brokerAddrs.end()) {
+ int baSize = bd.brokerAddrs.size();
+ int baIndex = std::rand() % baSize;
+ iter = bd.brokerAddrs.begin();
+ for (; baIndex > 0; baIndex--) {
+ iter++;
+ }
+ }
+ return iter->second;
+ }
+ return "";
}
- sort(trd->getBrokerDatas().begin(), trd->getBrokerDatas().end());
+ std::vector<QueueData> &getQueueDatas() { return m_queueDatas; }
- return trd;
- }
+ std::vector<BrokerData> &getBrokerDatas() { return m_brokerDatas; }
- string selectBrokerAddr() {
- vector<BrokerData>::iterator it = m_brokerDatas.begin();
- for (; it != m_brokerDatas.end(); ++it) {
- map<int, string>::iterator it1 = (*it).brokerAddrs.find(MASTER_ID);
- if (it1 != (*it).brokerAddrs.end()) {
- return it1->second;
- }
- }
- return "";
- }
+ const std::string &getOrderTopicConf() const { return m_orderTopicConf; }
+ void setOrderTopicConf(const string &orderTopicConf) { m_orderTopicConf = orderTopicConf; }
- vector<QueueData>& getQueueDatas() { return m_queueDatas; }
-
- vector<BrokerData>& getBrokerDatas() { return m_brokerDatas; }
-
- const string& getOrderTopicConf() const { return m_orderTopicConf; }
-
- void setOrderTopicConf(const string& orderTopicConf) {
- m_orderTopicConf = orderTopicConf;
- }
-
- bool operator==(const TopicRouteData& other) const {
- if (m_brokerDatas != other.m_brokerDatas) {
- return false;
+ bool operator==(const TopicRouteData &other) const {
+ return m_brokerDatas == other.m_brokerDatas && m_orderTopicConf == other.m_orderTopicConf &&
+ m_queueDatas == other.m_queueDatas;
}
- if (m_orderTopicConf != other.m_orderTopicConf) {
- return false;
- }
-
- if (m_queueDatas != other.m_queueDatas) {
- return false;
- }
- return true;
- }
-
- public:
- private:
- string m_orderTopicConf;
- vector<QueueData> m_queueDatas;
- vector<BrokerData> m_brokerDatas;
+ private:
+ std::string m_orderTopicConf;
+ std::vector<QueueData> m_queueDatas;
+ std::vector<BrokerData> m_brokerDatas;
};
-} //<!end namespace;
+} // namespace rocketmq
#endif