[ISSUE #89] it will crash when starting orderly push consumer. (#108)
[ISSUE #89] it will crash when starting orderly push consumer. (#108)
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 3460457..b4e6360 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -275,6 +275,10 @@
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID,
true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", itb->first.data());
+ continue;
+ }
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs(*(itb->second));
@@ -307,6 +311,10 @@
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
MASTER_ID, true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("unlock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
+ return;
+ }
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(
new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs;
@@ -352,9 +360,14 @@
LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin();
itb != brokerMqs.end(); ++itb) {
+ string brokerName = (*(itb->second))[0].getBrokerName();
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(
- (*(itb->second))[0].getBrokerName(), MASTER_ID, true));
+ brokerName, MASTER_ID, true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
+ continue;
+ }
unique_ptr<LockBatchRequestBody> lockBatchRequest(
new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
@@ -391,6 +404,10 @@
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
MASTER_ID, true));
+ if (!pFindBrokerResult) {
+ LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
+ return false;
+ }
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());