feat(client): add timer to clean off line broker and test case. (#222)
* fix(tranport): using recursive mutex to avoid death lock
* fix(transport): add test case for client factory.
* feat(unittest): open test case for client api impl
* feat(unittest): open test case for client api impl
diff --git a/.gitignore b/.gitignore
index 5693999..22c7350 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@
build
libs/signature/lib
tmp_*
+Testing
diff --git a/build.sh b/build.sh
index ca1bc25..77ab32b 100755
--- a/build.sh
+++ b/build.sh
@@ -368,7 +368,11 @@
fi
echo "############# unit test start ###########"
cd ${build_dir}
- make test
+ if [ $verbose -eq 0 ]; then
+ ctest
+ else
+ ctest -V
+ fi
if [ $? -ne 0 ]; then
echo "############# unit test failed ###########"
exit 1
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 0877a03..7520eb5 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -252,9 +252,9 @@
return SendResult();
}
-void MQClientAPIImpl::sendHearbeat(const string& addr,
- HeartbeatData* pHeartbeatData,
- const SessionCredentials& sessionCredentials) {
+void MQClientAPIImpl::sendHeartbeat(const string& addr,
+ HeartbeatData* pHeartbeatData,
+ const SessionCredentials& sessionCredentials) {
RemotingCommand request(HEART_BEAT, NULL);
string body;
@@ -265,7 +265,9 @@
request.Encode();
if (m_pRemotingClient->invokeHeartBeat(addr, request)) {
- LOG_INFO("sendheartbeat to broker:%s success", addr.c_str());
+ LOG_DEBUG("sendHeartbeat to broker:%s success", addr.c_str());
+ } else {
+ LOG_WARN("sendHeartbeat to broker:%s failed", addr.c_str());
}
}
@@ -314,6 +316,7 @@
}
}
case TOPIC_NOT_EXIST: {
+ LOG_WARN("Get topic[%s] route failed [TOPIC_NOT_EXIST].", topic.c_str());
return NULL;
}
default:
@@ -323,6 +326,7 @@
return NULL;
}
}
+ LOG_WARN("Get topic[%s] route failed [Null Response].", topic.c_str());
return NULL;
}
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9555d72..9f08dac 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -49,150 +49,152 @@
uint64_t tcpTransportTryLockTimeout,
string unitName);
virtual ~MQClientAPIImpl();
- void stopAllTcpTransportThread();
- bool writeDataToFile(string filename, string data, bool isSync);
- string fetchNameServerAddr(const string& NSDomain);
- void updateNameServerAddr(const string& addrs);
+ virtual void stopAllTcpTransportThread();
+ virtual bool writeDataToFile(string filename, string data, bool isSync);
+ virtual string fetchNameServerAddr(const string& NSDomain);
+ virtual void updateNameServerAddr(const string& addrs);
- void callSignatureBeforeRequest(const string& addr,
- RemotingCommand& request,
- const SessionCredentials& session_credentials);
- void createTopic(const string& addr,
- const string& defaultTopic,
- TopicConfig topicConfig,
- const SessionCredentials& sessionCredentials);
- void endTransactionOneway(std::string addr,
- EndTransactionRequestHeader* requestHeader,
- std::string remark,
- const SessionCredentials& sessionCredentials);
+ virtual void callSignatureBeforeRequest(const string& addr,
+ RemotingCommand& request,
+ const SessionCredentials& session_credentials);
+ virtual void createTopic(const string& addr,
+ const string& defaultTopic,
+ TopicConfig topicConfig,
+ const SessionCredentials& sessionCredentials);
+ virtual void endTransactionOneway(std::string addr,
+ EndTransactionRequestHeader* requestHeader,
+ std::string remark,
+ const SessionCredentials& sessionCredentials);
- SendResult sendMessage(const string& addr,
- const string& brokerName,
- const MQMessage& msg,
- SendMessageRequestHeader* pRequestHeader,
- int timeoutMillis,
- int maxRetrySendTimes,
- int communicationMode,
- SendCallback* pSendCallback,
- const SessionCredentials& sessionCredentials);
+ virtual SendResult sendMessage(const string& addr,
+ const string& brokerName,
+ const MQMessage& msg,
+ SendMessageRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ int maxRetrySendTimes,
+ int communicationMode,
+ SendCallback* pSendCallback,
+ const SessionCredentials& sessionCredentials);
- PullResult* pullMessage(const string& addr,
- PullMessageRequestHeader* pRequestHeader,
- int timeoutMillis,
- int communicationMode,
- PullCallback* pullCallback,
- void* pArg,
- const SessionCredentials& sessionCredentials);
-
- void sendHearbeat(const string& addr, HeartbeatData* pHeartbeatData, const SessionCredentials& sessionCredentials);
-
- void unregisterClient(const string& addr,
- const string& clientID,
- const string& producerGroup,
- const string& consumerGroup,
- const SessionCredentials& sessionCredentials);
-
- TopicRouteData* getTopicRouteInfoFromNameServer(const string& topic,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- TopicList* getTopicListFromNameServer(const SessionCredentials& sessionCredentials);
-
- int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName, int timeoutMillis);
-
- void deleteTopicInBroker(const string& addr, const string& topic, int timeoutMillis);
-
- void deleteTopicInNameServer(const string& addr, const string& topic, int timeoutMillis);
-
- void deleteSubscriptionGroup(const string& addr, const string& groupName, int timeoutMillis);
-
- string getKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
-
- KVTable getKVListByNamespace(const string& projectNamespace, int timeoutMillis);
-
- void deleteKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
-
- SendResult processSendResponse(const string& brokerName, const MQMessage& msg, RemotingCommand* pResponse);
-
- PullResult* processPullResponse(RemotingCommand* pResponse);
-
- int64 getMinOffset(const string& addr,
- const string& topic,
- int queueId,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- int64 getMaxOffset(const string& addr,
- const string& topic,
- int queueId,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- int64 searchOffset(const string& addr,
- const string& topic,
- int queueId,
- uint64_t timestamp,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- MQMessageExt* viewMessage(const string& addr,
- int64 phyoffset,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- int64 getEarliestMsgStoretime(const string& addr,
- const string& topic,
- int queueId,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- void getConsumerIdListByGroup(const string& addr,
- const string& consumerGroup,
- vector<string>& cids,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- int64 queryConsumerOffset(const string& addr,
- QueryConsumerOffsetRequestHeader* pRequestHeader,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- void updateConsumerOffset(const string& addr,
- UpdateConsumerOffsetRequestHeader* pRequestHeader,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
-
- void updateConsumerOffsetOneway(const string& addr,
- UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ virtual PullResult* pullMessage(const string& addr,
+ PullMessageRequestHeader* pRequestHeader,
int timeoutMillis,
+ int communicationMode,
+ PullCallback* pullCallback,
+ void* pArg,
const SessionCredentials& sessionCredentials);
- void consumerSendMessageBack(const string addr,
- MQMessageExt& msg,
- const string& consumerGroup,
- int delayLevel,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
+ virtual void sendHeartbeat(const string& addr,
+ HeartbeatData* pHeartbeatData,
+ const SessionCredentials& sessionCredentials);
- void lockBatchMQ(const string& addr,
- LockBatchRequestBody* requestBody,
- vector<MQMessageQueue>& mqs,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
+ virtual void unregisterClient(const string& addr,
+ const string& clientID,
+ const string& producerGroup,
+ const string& consumerGroup,
+ const SessionCredentials& sessionCredentials);
- void unlockBatchMQ(const string& addr,
- UnlockBatchRequestBody* requestBody,
- int timeoutMillis,
- const SessionCredentials& sessionCredentials);
+ virtual TopicRouteData* getTopicRouteInfoFromNameServer(const string& topic,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
- void sendMessageAsync(const string& addr,
- const string& brokerName,
- const MQMessage& msg,
- RemotingCommand& request,
- SendCallback* pSendCallback,
- int64 timeoutMilliseconds,
- int maxRetryTimes = 1,
- int retrySendTimes = 1);
+ virtual TopicList* getTopicListFromNameServer(const SessionCredentials& sessionCredentials);
+
+ virtual int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName, int timeoutMillis);
+
+ virtual void deleteTopicInBroker(const string& addr, const string& topic, int timeoutMillis);
+
+ virtual void deleteTopicInNameServer(const string& addr, const string& topic, int timeoutMillis);
+
+ virtual void deleteSubscriptionGroup(const string& addr, const string& groupName, int timeoutMillis);
+
+ virtual string getKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
+
+ virtual KVTable getKVListByNamespace(const string& projectNamespace, int timeoutMillis);
+
+ virtual void deleteKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
+
+ virtual SendResult processSendResponse(const string& brokerName, const MQMessage& msg, RemotingCommand* pResponse);
+
+ virtual PullResult* processPullResponse(RemotingCommand* pResponse);
+
+ virtual int64 getMinOffset(const string& addr,
+ const string& topic,
+ int queueId,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual int64 getMaxOffset(const string& addr,
+ const string& topic,
+ int queueId,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual int64 searchOffset(const string& addr,
+ const string& topic,
+ int queueId,
+ uint64_t timestamp,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual MQMessageExt* viewMessage(const string& addr,
+ int64 phyoffset,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual int64 getEarliestMsgStoretime(const string& addr,
+ const string& topic,
+ int queueId,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual void getConsumerIdListByGroup(const string& addr,
+ const string& consumerGroup,
+ vector<string>& cids,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual int64 queryConsumerOffset(const string& addr,
+ QueryConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual void updateConsumerOffset(const string& addr,
+ UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual void updateConsumerOffsetOneway(const string& addr,
+ UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual void consumerSendMessageBack(const string addr,
+ MQMessageExt& msg,
+ const string& consumerGroup,
+ int delayLevel,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual void lockBatchMQ(const string& addr,
+ LockBatchRequestBody* requestBody,
+ vector<MQMessageQueue>& mqs,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual void unlockBatchMQ(const string& addr,
+ UnlockBatchRequestBody* requestBody,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ virtual void sendMessageAsync(const string& addr,
+ const string& brokerName,
+ const MQMessage& msg,
+ RemotingCommand& request,
+ SendCallback* pSendCallback,
+ int64 timeoutMilliseconds,
+ int maxRetryTimes = 1,
+ int retrySendTimes = 1);
private:
SendResult sendMessageSync(const string& addr,
@@ -213,8 +215,10 @@
PullCallback* pullCallback,
void* pArg);
- private:
+ protected:
unique_ptr<TcpRemotingClient> m_pRemotingClient;
+
+ private:
unique_ptr<TopAddressing> m_topAddressing;
string m_nameSrvAddr;
bool m_firstFetchNameSrv;
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 03d4640..6315cb1 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -136,7 +136,7 @@
if (!isTopicInfoValidInTable(topic)) {
updateTopicRouteInfoFromNameServer(topic, session_credentials);
}
- //<!if not exsit ,update dafult topic;
+ //<!if not exist ,update default topic;
if (!isTopicInfoValidInTable(topic)) {
LOG_INFO("updateTopicRouteInfoFromNameServer with default");
updateTopicRouteInfoFromNameServer(topic, session_credentials, true);
@@ -156,7 +156,7 @@
bool isDefault /* = false */) {
boost::lock_guard<boost::mutex> lock(m_factoryLock);
unique_ptr<TopicRouteData> pTopicRouteData;
- LOG_INFO("updateTopicRouteInfoFromNameServer start:%s", topic.c_str());
+ LOG_DEBUG("updateTopicRouteInfoFromNameServer start. Topic:%s", topic.c_str());
if (isDefault) {
pTopicRouteData.reset(
@@ -176,7 +176,7 @@
}
if (pTopicRouteData != NULL) {
- LOG_INFO("updateTopicRouteInfoFromNameServer has data");
+ LOG_DEBUG("updateTopicRouteInfoFromNameServer has data");
TopicRouteData* pTemp = getTopicRouteData(topic);
bool changed = true;
if (pTemp != NULL) {
@@ -211,7 +211,7 @@
LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
return true;
}
- LOG_DEBUG("updateTopicRouteInfoFromNameServer end null:%s", topic.c_str());
+ LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
return false;
}
@@ -424,7 +424,7 @@
}
MQConsumer* MQClientFactory::selectConsumer(const string& group) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
if (m_consumerTable.find(group) != m_consumerTable.end()) {
return m_consumerTable[group];
}
@@ -432,7 +432,7 @@
}
bool MQClientFactory::getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
if (it->second)
sessionCredentials = it->second->getSessionCredentials();
@@ -446,7 +446,7 @@
bool MQClientFactory::getSessionCredentialFromConsumer(const string& consumerGroup,
SessionCredentials& sessionCredentials) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
if (m_consumerTable.find(consumerGroup) != m_consumerTable.end()) {
sessionCredentials = m_consumerTable[consumerGroup]->getSessionCredentials();
}
@@ -458,7 +458,7 @@
}
bool MQClientFactory::addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
if (m_consumerTable.find(consumerName) != m_consumerTable.end())
return false;
m_consumerTable[consumerName] = pMQConsumer;
@@ -466,7 +466,7 @@
}
void MQClientFactory::eraseConsumerFromTable(const string& consumerName) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
if (m_consumerTable.find(consumerName) != m_consumerTable.end())
m_consumerTable.erase(consumerName); // do not need freee pConsumer, as it
// was allocated by user
@@ -475,12 +475,12 @@
}
int MQClientFactory::getConsumerTableSize() {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
return m_consumerTable.size();
}
void MQClientFactory::getTopicListFromConsumerSubscription(set<string>& topicList) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
vector<SubscriptionData> result;
it->second->getSubscriptions(result);
@@ -493,14 +493,14 @@
}
void MQClientFactory::updateConsumerSubscribeTopicInfo(const string& topic, vector<MQMessageQueue> mqs) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
it->second->updateTopicSubscribeInfo(topic, mqs);
}
}
void MQClientFactory::insertConsumerInfoToHeartBeatData(HeartbeatData* pHeartbeatData) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
MQConsumer* pConsumer = it->second;
ConsumerData consumerData;
@@ -563,6 +563,27 @@
m_brokerAddrTable.clear();
}
+bool MQClientFactory::isBrokerAddressInUse(const std::string& address) {
+ if (m_topicRouteTableMutex.try_lock()) {
+ boost::lock_guard<boost::mutex> lk(m_topicRouteTableMutex, boost::adopt_lock_t());
+ for (TRDMAP::iterator it = m_topicRouteTable.begin(); it != m_topicRouteTable.end(); it++) {
+ TopicRouteData* topicRouteData = it->second;
+ vector<BrokerData>& brokerData = topicRouteData->getBrokerDatas();
+ for (vector<BrokerData>::iterator next = brokerData.begin(); next != brokerData.end(); next++) {
+ map<int, string>& brokerAddresses = next->brokerAddrs;
+ for (map<int, string>::iterator entry = brokerAddresses.begin(); entry != brokerAddresses.end(); entry++) {
+ if (address == entry->second) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ } else {
+ LOG_WARN("Cannot lock m_topicRouteTableMutex. Assume %s is still in use", address.c_str());
+ return true;
+ }
+}
void MQClientFactory::addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs) {
boost::lock_guard<boost::mutex> lock(m_brokerAddrlock);
if (m_brokerAddrTable.find(brokerName) != m_brokerAddrTable.end()) {
@@ -697,6 +718,35 @@
return m_pClientAPIImpl.get();
}
+void MQClientFactory::cleanOfflineBrokers() {
+ LOG_DEBUG("Begin to clean offline brokers");
+ boost::lock_guard<boost::mutex> lock(m_brokerAddrlock);
+
+ for (BrokerAddrMAP::iterator it = m_brokerAddrTable.begin(); it != m_brokerAddrTable.end();) {
+ std::string brokerName = it->first;
+ map<int, std::string> brokerIdAddressMap = it->second;
+
+ for (map<int, std::string>::iterator next = brokerIdAddressMap.begin(); next != brokerIdAddressMap.end();) {
+ if (!isBrokerAddressInUse(next->second)) {
+ LOG_INFO("Remove broker address: %s", (next->second).c_str());
+ brokerIdAddressMap.erase(next++);
+ } else {
+ next++;
+ }
+ }
+
+ if (brokerIdAddressMap.empty()) {
+ m_brokerAddrTable.erase(it++);
+ LOG_INFO("Broker name: %s is purged from client", brokerName.c_str());
+ } else {
+ LOG_DEBUG("Broker: %s is alive", brokerName.c_str());
+ it++;
+ }
+ }
+
+ LOG_DEBUG("Exit of cleaning offline brokers");
+}
+
void MQClientFactory::sendHeartbeatToAllBroker() {
BrokerAddrMAP brokerTable(getBrokerAddrMap());
if (brokerTable.size() == 0) {
@@ -724,7 +774,7 @@
continue;
try {
- m_pClientAPIImpl->sendHearbeat(addr, heartbeatData.get(), session_credentials);
+ m_pClientAPIImpl->sendHeartbeat(addr, heartbeatData.get(), session_credentials);
} catch (MQException& e) {
LOG_ERROR(e.what());
}
@@ -735,7 +785,7 @@
void MQClientFactory::persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer* t) {
{
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
if (m_consumerTable.size() > 0) {
for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
LOG_DEBUG("Client factory start persistAllConsumerOffset");
@@ -771,6 +821,14 @@
t->async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this, ec, t));
}
+void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer* t) {
+ cleanOfflineBrokers();
+
+ boost::system::error_code e;
+ t->expires_from_now(t->expires_from_now() + boost::posix_time::seconds(30), e);
+ t->async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec, t));
+}
+
void MQClientFactory::fetchNameServerAddr(boost::system::error_code& ec, boost::asio::deadline_timer* t) {
m_pClientAPIImpl->fetchNameServerAddr(m_nameSrvDomain);
@@ -793,6 +851,10 @@
boost::asio::deadline_timer t2(m_async_ioService, boost::posix_time::milliseconds(10));
t2.async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this, ec2, &t2));
+ boost::system::error_code ec3;
+ boost::asio::deadline_timer t3(m_async_ioService, boost::posix_time::seconds(3));
+ t3.async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec3, &t3));
+
if (startFetchNSService) {
boost::system::error_code ec5;
boost::asio::deadline_timer* t5 =
@@ -843,18 +905,18 @@
}
void MQClientFactory::doRebalance() {
- LOG_INFO("Client factory:%s start dorebalance", m_clientId.c_str());
+ LOG_DEBUG("Client factory:%s start doRebalance", m_clientId.c_str());
if (getConsumerTableSize() > 0) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
it->second->doRebalance();
}
}
- LOG_INFO("Client factory:%s finish dorebalance", m_clientId.c_str());
+ LOG_DEBUG("Client factory:%s finish doRebalance", m_clientId.c_str());
}
void MQClientFactory::doRebalanceByConsumerGroup(const string& consumerGroup) {
- boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+ boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
if (m_consumerTable.find(consumerGroup) != m_consumerTable.end()) {
LOG_INFO("Client factory:%s start dorebalance for consumer:%s", m_clientId.c_str(), consumerGroup.c_str());
MQConsumer* pMQConsumer = m_consumerTable[consumerGroup];
@@ -1125,9 +1187,9 @@
getSessionCredentialFromConsumerTable(session_credentials);
if (!session_credentials.isValid()) {
- LOG_ERROR(
+ LOG_INFO(
"updateTopicRouteInfo: didn't get the session_credentials from any "
- "producers and consumers, please re-intialize it");
+ "producers and consumers, please re-intialize it if application needs authentication");
}
}
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e0d0efd..b5e5441 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -20,6 +20,7 @@
#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread/thread.hpp>
#include "FindBrokerResult.h"
#include "MQClientAPIImpl.h"
@@ -103,6 +104,8 @@
void doRebalanceByConsumerGroup(const string& consumerGroup);
void sendHeartbeatToAllBroker();
+ void cleanOfflineBrokers();
+
void findConsumerIds(const string& topic,
const string& group,
vector<string>& cids,
@@ -114,6 +117,8 @@
map<string, map<int, string>> getBrokerAddrMap();
void clearBrokerAddrMap();
+ bool isBrokerAddressInUse(const std::string& address);
+
private:
void unregisterClient(const string& producerGroup,
const string& consumerGroup,
@@ -128,6 +133,8 @@
void updateTopicRouteInfo(boost::system::error_code& ec, boost::asio::deadline_timer* t);
void timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec, boost::asio::deadline_timer* t);
+ void timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer* t);
+
// consumer related operation
void consumer_timerOperation();
void persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer* t);
@@ -157,8 +164,12 @@
void getSessionCredentialsFromOneOfProducerOrConsumer(SessionCredentials& session_credentials);
- private:
+ protected:
string m_clientId;
+ unique_ptr<MQClientAPIImpl> m_pClientAPIImpl;
+ unique_ptr<ClientRemotingProcessor> m_pClientRemotingProcessor;
+
+ private:
string m_nameSrvDomain; // per clientId
ServiceState m_serviceState;
bool m_bFetchNSService;
@@ -170,7 +181,8 @@
//<! group --> MQConsumer;
typedef map<string, MQConsumer*> MQCMAP;
- boost::mutex m_consumerTableMutex;
+ // Changed to recursive mutex due to avoid deadlock issue:
+ boost::recursive_mutex m_consumerTableMutex;
MQCMAP m_consumerTable;
//<! Topic---> TopicRouteData
@@ -192,10 +204,6 @@
boost::mutex m_factoryLock;
boost::mutex m_topicPublishInfoLock;
- //<!clientapi;
- unique_ptr<MQClientAPIImpl> m_pClientAPIImpl;
- unique_ptr<ClientRemotingProcessor> m_pClientRemotingProcessor;
-
boost::asio::io_service m_async_ioService;
unique_ptr<boost::thread> m_async_service_thread;
diff --git a/src/MQClientManager.cpp b/src/MQClientManager.cpp
index 2a658c8..de35ddd 100644
--- a/src/MQClientManager.cpp
+++ b/src/MQClientManager.cpp
@@ -54,4 +54,4 @@
}
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/MQClientManager.h b/src/MQClientManager.h
index fc253d7..d846f96 100644
--- a/src/MQClientManager.h
+++ b/src/MQClientManager.h
@@ -27,11 +27,11 @@
class MQClientManager {
public:
virtual ~MQClientManager();
- MQClientFactory* getMQClientFactory(const string& clientId,
- int pullThreadNum,
- uint64_t tcpConnectTimeout,
- uint64_t tcpTransportTryLockTimeout,
- string unitName);
+ virtual MQClientFactory* getMQClientFactory(const string& clientId,
+ int pullThreadNum,
+ uint64_t tcpConnectTimeout,
+ uint64_t tcpTransportTryLockTimeout,
+ string unitName);
void removeClientFactory(const string& clientId);
static MQClientManager* getInstance();
@@ -45,6 +45,6 @@
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/common/TopAddressing.h b/src/common/TopAddressing.h
index 80ad170..a698862 100644
--- a/src/common/TopAddressing.h
+++ b/src/common/TopAddressing.h
@@ -31,7 +31,7 @@
virtual ~TopAddressing();
public:
- string fetchNSAddr(const string& NSDomain);
+ virtual string fetchNSAddr(const string& NSDomain);
private:
string clearNewLine(const string& str);
@@ -43,5 +43,5 @@
list<string> m_addrs;
string m_unitName;
};
-}
+} // namespace rocketmq
#endif
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
old mode 100755
new mode 100644
index c612e9d..265177f
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -39,24 +39,24 @@
TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout);
virtual ~TcpRemotingClient();
- void stopAllTcpTransportThread();
- void updateNameServerAddressList(const string& addrs);
+ virtual void stopAllTcpTransportThread();
+ virtual void updateNameServerAddressList(const string& addrs);
- bool invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+ virtual bool invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
// delete outsite;
- RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+ virtual RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
- bool invokeAsync(const string& addr,
- RemotingCommand& request,
- AsyncCallbackWrap* cbw,
- int64 timeoutMilliseconds,
- int maxRetrySendTimes = 1,
- int retrySendTimes = 1);
+ virtual bool invokeAsync(const string& addr,
+ RemotingCommand& request,
+ AsyncCallbackWrap* cbw,
+ int64 timeoutMilliseconds,
+ int maxRetrySendTimes = 1,
+ int retrySendTimes = 1);
- void invokeOneway(const string& addr, RemotingCommand& request);
+ virtual void invokeOneway(const string& addr, RemotingCommand& request);
- void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
+ virtual void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
private:
static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
new file mode 100644
index 0000000..cf63f67
--- /dev/null
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "CommunicationMode.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+
+using namespace std;
+using namespace rocketmq;
+using rocketmq::CommunicationMode;
+using rocketmq::RemotingCommand;
+using rocketmq::TcpRemotingClient;
+using testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Mock;
+using testing::Return;
+
+class MockTcpRemotingClient : public TcpRemotingClient {
+ public:
+ MockTcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
+ : TcpRemotingClient(pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout) {}
+
+ MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int));
+};
+class MockMQClientAPIImpl : public MQClientAPIImpl {
+ public:
+ MockMQClientAPIImpl(const string& mqClientId,
+ ClientRemotingProcessor* clientRemotingProcessor,
+ int pullThreadNum,
+ uint64_t tcpConnectTimeout,
+ uint64_t tcpTransportTryLockTimeout,
+ string unitName)
+ : MQClientAPIImpl(mqClientId,
+ clientRemotingProcessor,
+ pullThreadNum,
+ tcpConnectTimeout,
+ tcpTransportTryLockTimeout,
+ unitName) {
+ m_processor = clientRemotingProcessor;
+ }
+ ClientRemotingProcessor* m_processor;
+ void reInitRemoteClient(TcpRemotingClient* client) {
+ m_pRemotingClient.reset(client);
+ m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE, m_processor);
+ m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET, m_processor);
+ m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT, m_processor);
+ m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO, m_processor);
+ m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED, m_processor);
+ m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY, m_processor);
+ }
+};
+class MockMQClientAPIImplUtil {
+ public:
+ static MockMQClientAPIImplUtil* GetInstance() {
+ static MockMQClientAPIImplUtil instance;
+ return &instance;
+ }
+ MockMQClientAPIImpl* GetGtestMockClientAPIImpl() {
+ if (m_impl != nullptr) {
+ return m_impl;
+ }
+ string cid = "testClientId";
+ int ptN = 1;
+ uint64_t tct = 3000;
+ uint64_t ttt = 3000;
+ string un = "central";
+ SessionCredentials sc;
+ ClientRemotingProcessor* pp = new ClientRemotingProcessor(nullptr);
+ MockMQClientAPIImpl* impl = new MockMQClientAPIImpl(cid, pp, ptN, tct, ttt, un);
+ MockTcpRemotingClient* pClient = new MockTcpRemotingClient(ptN, tct, ttt);
+ impl->reInitRemoteClient(pClient);
+ m_impl = impl;
+ m_pClient = pClient;
+ return impl;
+ }
+ MockTcpRemotingClient* GetGtestMockRemotingClient() { return m_pClient; }
+ MockMQClientAPIImpl* m_impl = nullptr;
+ MockTcpRemotingClient* m_pClient = nullptr;
+};
+
+TEST(MQClientAPIImplTest, getMaxOffset) {
+ SessionCredentials sc;
+ MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ Mock::AllowLeak(impl);
+ MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ Mock::AllowLeak(pClient);
+ GetMaxOffsetResponseHeader* pHead = new GetMaxOffsetResponseHeader();
+ pHead->offset = 4096;
+ RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
+ RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, pHead);
+ EXPECT_CALL(*pClient, invokeSync(_, _, _))
+ .Times(3)
+ .WillOnce(Return(nullptr))
+ .WillOnce(Return(pCommandFailed))
+ .WillOnce(Return(pCommandSuccuss));
+ EXPECT_ANY_THROW(impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+ EXPECT_ANY_THROW(impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+ int64 offset = impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
+ EXPECT_EQ(4096, offset);
+}
+
+TEST(MQClientAPIImplTest, getMinOffset) {
+ SessionCredentials sc;
+ MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ Mock::AllowLeak(impl);
+ MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ Mock::AllowLeak(pClient);
+ GetMinOffsetResponseHeader* pHead = new GetMinOffsetResponseHeader();
+ pHead->offset = 2048;
+ RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
+ RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, pHead);
+ EXPECT_CALL(*pClient, invokeSync(_, _, _))
+ .Times(3)
+ .WillOnce(Return(nullptr))
+ .WillOnce(Return(pCommandFailed))
+ .WillOnce(Return(pCommandSuccuss));
+ EXPECT_ANY_THROW(impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+ EXPECT_ANY_THROW(impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+ int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
+ EXPECT_EQ(2048, offset);
+}
+
+TEST(MQClientAPIImplTest, sendMessage) {
+ string cid = "testClientId";
+ SessionCredentials sc;
+ MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ Mock::AllowLeak(impl);
+ MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ Mock::AllowLeak(pClient);
+
+ SendMessageResponseHeader* pHead = new SendMessageResponseHeader();
+ pHead->msgId = "MessageID";
+ pHead->queueId = 1;
+ pHead->queueOffset = 409600;
+ RemotingCommand* pCommandSync = new RemotingCommand(SUCCESS_VALUE, pHead);
+ EXPECT_CALL(*pClient, invokeSync(_, _, _)).Times(1).WillOnce(Return(pCommandSync));
+ MQMessage message("testTopic", "Hello, RocketMQ");
+ string unique_msgId = "UniqMessageID";
+ message.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_msgId);
+ SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
+ requestHeader->producerGroup = cid;
+ requestHeader->topic = (message.getTopic());
+ requestHeader->defaultTopic = DEFAULT_TOPIC;
+ requestHeader->defaultTopicQueueNums = 4;
+ requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
+ SendResult result =
+ impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader, 100, 1, ComMode_SYNC, nullptr, sc);
+ EXPECT_EQ(result.getSendStatus(), SEND_OK);
+ EXPECT_EQ(result.getMsgId(), unique_msgId);
+ EXPECT_EQ(result.getQueueOffset(), 409600);
+ EXPECT_EQ(result.getOffsetMsgId(), "MessageID");
+ EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
+ EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
+}
+int main(int argc, char* argv[]) {
+ InitGoogleMock(&argc, argv);
+ testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*";
+ return RUN_ALL_TESTS();
+}
diff --git a/test/src/MQClientFactoryTest.cpp b/test/src/MQClientFactoryTest.cpp
new file mode 100644
index 0000000..11490e5
--- /dev/null
+++ b/test/src/MQClientFactoryTest.cpp
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "MQClientFactory.h"
+
+using namespace std;
+using namespace rocketmq;
+using rocketmq::MQClientFactory;
+using rocketmq::TopicRouteData;
+using testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+class MockMQClientAPIImpl : public MQClientAPIImpl {
+ public:
+ MockMQClientAPIImpl(const string& mqClientId,
+ ClientRemotingProcessor* clientRemotingProcessor,
+ int pullThreadNum,
+ uint64_t tcpConnectTimeout,
+ uint64_t tcpTransportTryLockTimeout,
+ string unitName)
+ : MQClientAPIImpl(mqClientId,
+ clientRemotingProcessor,
+ pullThreadNum,
+ tcpConnectTimeout,
+ tcpTransportTryLockTimeout,
+ unitName) {}
+
+ MOCK_METHOD5(getMinOffset, int64(const string&, const string&, int, int, const SessionCredentials&));
+ MOCK_METHOD3(getTopicRouteInfoFromNameServer, TopicRouteData*(const string&, int, const SessionCredentials&));
+};
+class MockMQClientFactory : public MQClientFactory {
+ public:
+ MockMQClientFactory(const string& mqClientId,
+ int pullThreadNum,
+ uint64_t tcpConnectTimeout,
+ uint64_t tcpTransportTryLockTimeout,
+ string unitName)
+ : MQClientFactory(mqClientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName) {}
+ void reInitClientImpl(MQClientAPIImpl* pImpl) { m_pClientAPIImpl.reset(pImpl); }
+ void reInitRemotingProcessor(ClientRemotingProcessor* pImpl) { m_pClientRemotingProcessor.reset(pImpl); }
+ ClientRemotingProcessor* getRemotingProcessor() { return m_pClientRemotingProcessor.release(); }
+};
+
+TEST(MQClientFactoryTest, minOffset) {
+ string clientId = "testClientId";
+ int pullThreadNum = 1;
+ uint64_t tcpConnectTimeout = 3000;
+ uint64_t tcpTransportTryLockTimeout = 3000;
+ string unitName = "central";
+ MockMQClientFactory* factory =
+ new MockMQClientFactory(clientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName);
+ MockMQClientAPIImpl* pImpl = new MockMQClientAPIImpl(clientId, factory->getRemotingProcessor(), pullThreadNum,
+ tcpConnectTimeout, tcpTransportTryLockTimeout, unitName);
+ factory->reInitClientImpl(pImpl);
+ MQMessageQueue mq;
+ mq.setTopic("testTopic");
+ mq.setBrokerName("testBroker");
+ mq.setQueueId(1);
+ SessionCredentials session_credentials;
+
+ TopicRouteData* pData = new TopicRouteData();
+ pData->setOrderTopicConf("OrderTopicConf");
+ QueueData qd;
+ qd.brokerName = "testBroker";
+ qd.readQueueNums = 8;
+ qd.writeQueueNums = 8;
+ qd.perm = 1;
+ pData->getQueueDatas().push_back(qd);
+ BrokerData bd;
+ bd.brokerName = "testBroker";
+ bd.brokerAddrs[0] = "127.0.0.1:10091";
+ bd.brokerAddrs[1] = "127.0.0.2:10092";
+ pData->getBrokerDatas().push_back(bd);
+
+ EXPECT_CALL(*pImpl, getMinOffset(_, _, _, _, _)).Times(1).WillOnce(Return(1024));
+ EXPECT_CALL(*pImpl, getTopicRouteInfoFromNameServer(_, _, _)).Times(1).WillOnce(Return(pData));
+ int64 offset = factory->minOffset(mq, session_credentials);
+ EXPECT_EQ(1024, offset);
+ delete factory;
+}
+
+int main(int argc, char* argv[]) {
+ InitGoogleMock(&argc, argv);
+ return RUN_ALL_TESTS();
+}