| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "MQClientInstance.h" |
| |
| #include <typeindex> |
| |
| #include "ClientRemotingProcessor.h" |
| #include "Logging.h" |
| #include "MQAdminImpl.h" |
| #include "MQClientAPIImpl.h" |
| #include "MQClientManager.h" |
| #include "MQVersion.h" |
| #include "PermName.h" |
| #include "PullMessageService.hpp" |
| #include "PullRequest.h" |
| #include "RebalanceImpl.h" |
| #include "RebalanceService.h" |
| #include "TcpRemotingClient.h" |
| #include "TopicPublishInfo.hpp" |
| #include "UtilAll.h" |
| #include "protocol/body/ConsumerRunningInfo.h" |
| |
| namespace rocketmq { |
| |
| static const long LOCK_TIMEOUT_MILLIS = 3000L; |
| |
| MQClientInstance::MQClientInstance(const MQClientConfig& clientConfig, const std::string& clientId) |
| : MQClientInstance(clientConfig, clientId, nullptr) {} |
| |
| MQClientInstance::MQClientInstance(const MQClientConfig& clientConfig, const std::string& clientId, RPCHookPtr rpcHook) |
| : client_id_(clientId), |
| rebalance_service_(new RebalanceService(this)), |
| pull_message_service_(new PullMessageService(this)), |
| scheduled_executor_service_("MQClient", false) { |
| // default Topic register |
| TopicPublishInfoPtr defaultTopicInfo(new TopicPublishInfo()); |
| topic_publish_info_table_[AUTO_CREATE_TOPIC_KEY_TOPIC] = defaultTopicInfo; |
| |
| client_remoting_processor_.reset(new ClientRemotingProcessor(this)); |
| mq_client_api_impl_.reset(new MQClientAPIImpl(client_remoting_processor_.get(), rpcHook, clientConfig)); |
| |
| std::string namesrvAddr = clientConfig.namesrv_addr(); |
| if (!namesrvAddr.empty()) { |
| mq_client_api_impl_->updateNameServerAddressList(namesrvAddr); |
| LOG_INFO_NEW("user specified name server address: {}", namesrvAddr); |
| } |
| |
| mq_admin_impl_.reset(new MQAdminImpl(this)); |
| |
| service_state_ = CREATE_JUST; |
| LOG_DEBUG_NEW("MQClientInstance construct"); |
| } |
| |
| MQClientInstance::~MQClientInstance() { |
| LOG_INFO_NEW("MQClientInstance:{} destruct", client_id_); |
| |
| // UNNECESSARY: |
| producer_table_.clear(); |
| consumer_table_.clear(); |
| topic_publish_info_table_.clear(); |
| topic_route_table_.clear(); |
| broker_addr_table_.clear(); |
| |
| mq_client_api_impl_ = nullptr; |
| } |
| |
| std::string MQClientInstance::getNamesrvAddr() const { |
| auto namesrvAddrs = mq_client_api_impl_->getRemotingClient()->getNameServerAddressList(); |
| std::ostringstream oss; |
| for (const auto& addr : namesrvAddrs) { |
| oss << addr << ";"; |
| } |
| return oss.str(); |
| } |
| |
| TopicPublishInfoPtr MQClientInstance::topicRouteData2TopicPublishInfo(const std::string& topic, |
| TopicRouteDataPtr route) { |
| auto info = std::make_shared<TopicPublishInfo>(); |
| info->setTopicRouteData(route); |
| |
| auto& mqList = const_cast<TopicPublishInfo::QueuesVec&>(info->getMessageQueueList()); |
| |
| std::string orderTopicConf = route->order_topic_conf(); |
| if (!orderTopicConf.empty()) { // order msg |
| // "broker-a:8";"broker-b:8" |
| std::vector<std::string> brokers; |
| UtilAll::Split(brokers, orderTopicConf, ';'); |
| for (const auto& broker : brokers) { |
| std::vector<std::string> item; |
| UtilAll::Split(item, broker, ':'); |
| int nums = atoi(item[1].c_str()); |
| for (int i = 0; i < nums; i++) { |
| mqList.emplace_back(topic, item[0], i); |
| } |
| } |
| info->setOrderTopic(true); |
| } else { // no order msg |
| const auto& qds = route->queue_datas(); |
| for (const auto& qd : qds) { |
| if (PermName::isWriteable(qd.perm())) { |
| const BrokerData* brokerData = nullptr; |
| for (const auto& bd : route->broker_datas()) { |
| if (bd.broker_name() == qd.broker_name()) { |
| brokerData = &bd; |
| break; |
| } |
| } |
| |
| if (nullptr == brokerData) { |
| LOG_WARN_NEW("MQClientInstance: broker:{} of topic:{} have not data", qd.broker_name(), topic); |
| continue; |
| } |
| |
| if (brokerData->broker_addrs().find(MASTER_ID) == brokerData->broker_addrs().end()) { |
| LOG_WARN_NEW("MQClientInstance: broker:{} of topic:{} have not master node", qd.broker_name(), topic); |
| continue; |
| } |
| |
| for (int i = 0; i < qd.write_queue_nums(); i++) { |
| mqList.emplace_back(topic, qd.broker_name(), i); |
| } |
| } |
| } |
| |
| // sort, make brokerName is staggered. |
| std::sort(mqList.begin(), mqList.end(), [](const MQMessageQueue& a, const MQMessageQueue& b) { |
| auto result = a.queue_id() - b.queue_id(); |
| if (result == 0) { |
| result = a.broker_name().compare(b.broker_name()); |
| } |
| return result < 0; |
| }); |
| |
| info->setOrderTopic(false); |
| } |
| |
| return info; |
| } |
| |
| std::vector<MQMessageQueue> MQClientInstance::topicRouteData2TopicSubscribeInfo(const std::string& topic, |
| TopicRouteDataPtr route) { |
| std::vector<MQMessageQueue> mqList; |
| const auto& queueDatas = route->queue_datas(); |
| for (const auto& qd : queueDatas) { |
| if (PermName::isReadable(qd.perm())) { |
| for (int i = 0; i < qd.read_queue_nums(); i++) { |
| MQMessageQueue mq(topic, qd.broker_name(), i); |
| mqList.push_back(mq); |
| } |
| } |
| } |
| return mqList; |
| } |
| |
| void MQClientInstance::start() { |
| switch (service_state_) { |
| case CREATE_JUST: |
| LOG_INFO_NEW("the client instance [{}] is starting", client_id_); |
| service_state_ = START_FAILED; |
| |
| mq_client_api_impl_->start(); |
| |
| // start various schedule tasks |
| startScheduledTask(); |
| |
| // start pull service |
| pull_message_service_->start(); |
| |
| // start rebalance service |
| rebalance_service_->start(); |
| |
| LOG_INFO_NEW("the client instance [{}] start OK", client_id_); |
| service_state_ = RUNNING; |
| break; |
| case RUNNING: |
| LOG_INFO_NEW("the client instance [{}] already running.", client_id_, service_state_); |
| break; |
| case SHUTDOWN_ALREADY: |
| case START_FAILED: |
| LOG_INFO_NEW("the client instance [{}] start failed with fault state:{}", client_id_, service_state_); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| void MQClientInstance::shutdown() { |
| if (getConsumerTableSize() != 0) { |
| return; |
| } |
| |
| if (getProducerTableSize() != 0) { |
| return; |
| } |
| |
| switch (service_state_) { |
| case CREATE_JUST: |
| break; |
| case RUNNING: { |
| service_state_ = SHUTDOWN_ALREADY; |
| pull_message_service_->shutdown(); |
| scheduled_executor_service_.shutdown(); |
| mq_client_api_impl_->shutdown(); |
| rebalance_service_->shutdown(); |
| |
| MQClientManager::getInstance()->removeMQClientInstance(client_id_); |
| LOG_INFO_NEW("the client instance [{}] shutdown OK", client_id_); |
| } break; |
| case SHUTDOWN_ALREADY: |
| break; |
| default: |
| break; |
| } |
| } |
| |
| bool MQClientInstance::isRunning() { |
| return service_state_ == RUNNING; |
| } |
| |
| void MQClientInstance::startScheduledTask() { |
| LOG_INFO_NEW("start scheduled task:{}", client_id_); |
| scheduled_executor_service_.startup(); |
| |
| // updateTopicRouteInfoFromNameServer |
| scheduled_executor_service_.schedule(std::bind(&MQClientInstance::updateTopicRouteInfoPeriodically, this), 10, |
| time_unit::milliseconds); |
| |
| // sendHeartbeatToAllBroker |
| scheduled_executor_service_.schedule(std::bind(&MQClientInstance::sendHeartbeatToAllBrokerPeriodically, this), 1000, |
| time_unit::milliseconds); |
| |
| // persistAllConsumerOffset |
| scheduled_executor_service_.schedule(std::bind(&MQClientInstance::persistAllConsumerOffsetPeriodically, this), |
| 1000 * 10, time_unit::milliseconds); |
| } |
| |
| void MQClientInstance::updateTopicRouteInfoPeriodically() { |
| updateTopicRouteInfoFromNameServer(); |
| |
| // next round |
| scheduled_executor_service_.schedule(std::bind(&MQClientInstance::updateTopicRouteInfoPeriodically, this), 1000 * 30, |
| time_unit::milliseconds); |
| } |
| |
| void MQClientInstance::sendHeartbeatToAllBrokerPeriodically() { |
| cleanOfflineBroker(); |
| sendHeartbeatToAllBrokerWithLock(); |
| |
| // next round |
| scheduled_executor_service_.schedule(std::bind(&MQClientInstance::sendHeartbeatToAllBrokerPeriodically, this), |
| 1000 * 30, time_unit::milliseconds); |
| } |
| |
| void MQClientInstance::persistAllConsumerOffsetPeriodically() { |
| persistAllConsumerOffset(); |
| |
| // next round |
| scheduled_executor_service_.schedule(std::bind(&MQClientInstance::persistAllConsumerOffsetPeriodically, this), |
| 1000 * 5, time_unit::milliseconds); |
| } |
| |
| const std::string& MQClientInstance::getClientId() const { |
| return client_id_; |
| } |
| |
| void MQClientInstance::updateTopicRouteInfoFromNameServer() { |
| std::set<std::string> topicList; |
| |
| // Consumer |
| getTopicListFromConsumerSubscription(topicList); |
| |
| // Producer |
| getTopicListFromTopicPublishInfo(topicList); |
| |
| // update |
| if (!topicList.empty()) { |
| for (const auto& topic : topicList) { |
| updateTopicRouteInfoFromNameServer(topic); |
| } |
| } |
| } |
| |
| void MQClientInstance::cleanOfflineBroker() { |
| if (UtilAll::try_lock_for(lock_namesrv_, LOCK_TIMEOUT_MILLIS)) { |
| std::lock_guard<std::timed_mutex> lock(lock_namesrv_, std::adopt_lock); |
| |
| std::set<std::string> offlineBrokers; |
| BrokerAddrMAP updatedTable(getBrokerAddrTable()); |
| for (auto itBrokerTable = updatedTable.begin(); itBrokerTable != updatedTable.end();) { |
| const auto& brokerName = itBrokerTable->first; |
| auto& cloneAddrTable = itBrokerTable->second; |
| |
| for (auto it = cloneAddrTable.begin(); it != cloneAddrTable.end();) { |
| const auto& addr = it->second; |
| if (!isBrokerAddrExistInTopicRouteTable(addr)) { |
| offlineBrokers.insert(addr); |
| it = cloneAddrTable.erase(it); |
| LOG_INFO_NEW("the broker addr[{} {}] is offline, remove it", brokerName, addr); |
| } else { |
| it++; |
| } |
| } |
| |
| if (cloneAddrTable.empty()) { |
| itBrokerTable = updatedTable.erase(itBrokerTable); |
| LOG_INFO_NEW("the broker[{}] name's host is offline, remove it", brokerName); |
| } else { |
| itBrokerTable++; |
| } |
| } |
| |
| if (offlineBrokers.size() > 0) { |
| resetBrokerAddrTable(std::move(updatedTable)); |
| |
| std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_); |
| for (auto it = topic_broker_addr_table_.begin(); it != topic_broker_addr_table_.end();) { |
| if (offlineBrokers.find(it->second.first) != offlineBrokers.end()) { |
| it = topic_broker_addr_table_.erase(it); |
| } else { |
| it++; |
| } |
| } |
| } |
| } else { |
| LOG_WARN_NEW("lock namesrv, but failed."); |
| } |
| } |
| |
| bool MQClientInstance::isBrokerAddrExistInTopicRouteTable(const std::string& addr) { |
| std::lock_guard<std::mutex> lock(topic_route_table_mutex_); |
| for (const auto& it : topic_route_table_) { |
| const auto topicRouteData = it.second; |
| const auto& bds = topicRouteData->broker_datas(); |
| for (const auto& bd : bds) { |
| for (const auto& itAddr : bd.broker_addrs()) { |
| if (itAddr.second == addr) { |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| void MQClientInstance::sendHeartbeatToAllBrokerWithLock() { |
| if (lock_heartbeat_.try_lock()) { |
| std::lock_guard<std::timed_mutex> lock(lock_heartbeat_, std::adopt_lock); |
| sendHeartbeatToAllBroker(); |
| } else { |
| LOG_WARN_NEW("lock heartBeat, but failed."); |
| } |
| } |
| |
| void MQClientInstance::persistAllConsumerOffset() { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| for (const auto& it : consumer_table_) { |
| LOG_DEBUG_NEW("the client instance [{}] start persistAllConsumerOffset", client_id_); |
| it.second->persistConsumerOffset(); |
| } |
| } |
| |
| void MQClientInstance::sendHeartbeatToAllBroker() { |
| std::unique_ptr<HeartbeatData> heartbeatData(prepareHeartbeatData()); |
| bool producerEmpty = heartbeatData->producer_data_set().empty(); |
| bool consumerEmpty = heartbeatData->consumer_data_set().empty(); |
| if (producerEmpty && consumerEmpty) { |
| LOG_WARN_NEW("sending heartbeat, but no consumer and no producer"); |
| return; |
| } |
| |
| auto brokerAddrTable = getBrokerAddrTable(); |
| if (!brokerAddrTable.empty()) { |
| for (const auto& it : brokerAddrTable) { |
| // const auto& brokerName = it.first; |
| const auto& oneTable = it.second; |
| for (const auto& it2 : oneTable) { |
| const auto id = it2.first; |
| const auto& addr = it2.second; |
| if (consumerEmpty && id != MASTER_ID) { |
| continue; |
| } |
| |
| try { |
| mq_client_api_impl_->sendHearbeat(addr, heartbeatData.get(), 3000); |
| } catch (const MQException& e) { |
| LOG_ERROR_NEW("{}", e.what()); |
| } |
| } |
| } |
| brokerAddrTable.clear(); |
| } else { |
| LOG_WARN_NEW("sendheartbeat brokerAddrTable is empty"); |
| } |
| } |
| |
| bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& topic, bool isDefault) { |
| if (UtilAll::try_lock_for(lock_namesrv_, LOCK_TIMEOUT_MILLIS)) { |
| std::lock_guard<std::timed_mutex> lock(lock_namesrv_, std::adopt_lock); |
| LOG_DEBUG_NEW("updateTopicRouteInfoFromNameServer start:{}", topic); |
| |
| try { |
| TopicRouteDataPtr topicRouteData; |
| if (isDefault) { |
| topicRouteData = mq_client_api_impl_->getTopicRouteInfoFromNameServer(AUTO_CREATE_TOPIC_KEY_TOPIC, 1000 * 3); |
| if (topicRouteData != nullptr) { |
| auto& queueDatas = topicRouteData->queue_datas(); |
| for (auto& qd : queueDatas) { |
| int queueNums = std::min(4, qd.read_queue_nums()); |
| qd.set_read_queue_nums(queueNums); |
| qd.set_write_queue_nums(queueNums); |
| } |
| } |
| LOG_DEBUG_NEW("getTopicRouteInfoFromNameServer is null for topic: {}", topic); |
| } else { |
| topicRouteData = mq_client_api_impl_->getTopicRouteInfoFromNameServer(topic, 1000 * 3); |
| } |
| if (topicRouteData != nullptr) { |
| LOG_INFO_NEW("updateTopicRouteInfoFromNameServer has data"); |
| auto old = getTopicRouteData(topic); |
| bool changed = topicRouteDataIsChange(old.get(), topicRouteData.get()); |
| |
| if (changed) { |
| LOG_INFO_NEW("updateTopicRouteInfoFromNameServer changed:{}", topic); |
| |
| // update broker addr |
| const auto& brokerDatas = topicRouteData->broker_datas(); |
| for (const auto& bd : brokerDatas) { |
| LOG_INFO_NEW("updateTopicRouteInfoFromNameServer changed with broker name:{}", bd.broker_name()); |
| addBrokerToAddrTable(bd.broker_name(), bd.broker_addrs()); |
| } |
| |
| // update publish info |
| { |
| TopicPublishInfoPtr publishInfo(topicRouteData2TopicPublishInfo(topic, topicRouteData)); |
| updateProducerTopicPublishInfo(topic, publishInfo); |
| } |
| |
| // update subscribe info |
| if (getConsumerTableSize() > 0) { |
| std::vector<MQMessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); |
| updateConsumerTopicSubscribeInfo(topic, subscribeInfo); |
| } |
| |
| addTopicRouteData(topic, topicRouteData); |
| } |
| |
| LOG_DEBUG_NEW("updateTopicRouteInfoFromNameServer end:{}", topic); |
| return true; |
| } else { |
| LOG_WARN_NEW("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", |
| topic); |
| } |
| } catch (const std::exception& e) { |
| if (!UtilAll::isRetryTopic(topic) && topic != AUTO_CREATE_TOPIC_KEY_TOPIC) { |
| LOG_WARN_NEW("updateTopicRouteInfoFromNameServer Exception, {}", e.what()); |
| } |
| } |
| } else { |
| LOG_WARN_NEW("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); |
| } |
| |
| return false; |
| } |
| |
| std::unique_ptr<HeartbeatData> MQClientInstance::prepareHeartbeatData() { |
| std::unique_ptr<HeartbeatData> heartbeat_data(new HeartbeatData()); |
| |
| // clientID |
| heartbeat_data->set_client_id(client_id_); |
| |
| // Consumer |
| insertConsumerInfoToHeartBeatData(heartbeat_data.get()); |
| |
| // Producer |
| insertProducerInfoToHeartBeatData(heartbeat_data.get()); |
| |
| return heartbeat_data; |
| } |
| |
| void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* heartbeatData) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| for (const auto& it : consumer_table_) { |
| const auto* consumer = it.second; |
| // TODO: unitMode |
| heartbeatData->consumer_data_set().emplace_back(consumer->groupName(), consumer->consumeType(), |
| consumer->messageModel(), consumer->consumeFromWhere(), |
| consumer->subscriptions()); |
| } |
| } |
| |
| void MQClientInstance::insertProducerInfoToHeartBeatData(HeartbeatData* heartbeatData) { |
| std::lock_guard<std::mutex> lock(producer_table_mutex_); |
| for (const auto& it : producer_table_) { |
| heartbeatData->producer_data_set().emplace_back(it.first); |
| } |
| } |
| |
| bool MQClientInstance::topicRouteDataIsChange(TopicRouteData* olddata, TopicRouteData* nowdata) { |
| if (olddata == nullptr || nowdata == nullptr) { |
| return true; |
| } |
| return !(*olddata == *nowdata); |
| } |
| |
| TopicRouteDataPtr MQClientInstance::getTopicRouteData(const std::string& topic) { |
| std::lock_guard<std::mutex> lock(topic_route_table_mutex_); |
| const auto& it = topic_route_table_.find(topic); |
| if (it != topic_route_table_.end()) { |
| return it->second; |
| } |
| return nullptr; |
| } |
| |
| void MQClientInstance::addTopicRouteData(const std::string& topic, TopicRouteDataPtr topicRouteData) { |
| std::lock_guard<std::mutex> lock(topic_route_table_mutex_); |
| topic_route_table_[topic] = topicRouteData; |
| } |
| |
| bool MQClientInstance::registerConsumer(const std::string& group, MQConsumerInner* consumer) { |
| if (group.empty()) { |
| return false; |
| } |
| |
| if (!addConsumerToTable(group, consumer)) { |
| LOG_WARN_NEW("the consumer group[{}] exist already.", group); |
| return false; |
| } |
| |
| LOG_DEBUG_NEW("registerConsumer success:{}", group); |
| return true; |
| } |
| |
| void MQClientInstance::unregisterConsumer(const std::string& group) { |
| eraseConsumerFromTable(group); |
| unregisterClientWithLock(null, group); |
| } |
| |
| void MQClientInstance::unregisterClientWithLock(const std::string& producerGroup, const std::string& consumerGroup) { |
| if (UtilAll::try_lock_for(lock_heartbeat_, LOCK_TIMEOUT_MILLIS)) { |
| std::lock_guard<std::timed_mutex> lock(lock_heartbeat_, std::adopt_lock); |
| |
| try { |
| unregisterClient(producerGroup, consumerGroup); |
| } catch (const std::exception& e) { |
| LOG_ERROR_NEW("unregisterClient exception: {}", e.what()); |
| } |
| } else { |
| LOG_WARN_NEW("lock heartBeat, but failed."); |
| } |
| } |
| |
| void MQClientInstance::unregisterClient(const std::string& producerGroup, const std::string& consumerGroup) { |
| BrokerAddrMAP brokerAddrTable(getBrokerAddrTable()); |
| for (const auto& it : brokerAddrTable) { |
| const auto& brokerName = it.first; |
| const auto& oneTable = it.second; |
| for (const auto& it2 : oneTable) { |
| const auto& index = it2.first; |
| const auto& addr = it2.second; |
| try { |
| mq_client_api_impl_->unregisterClient(addr, client_id_, producerGroup, consumerGroup); |
| LOG_INFO_NEW("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, |
| consumerGroup, brokerName, index, addr); |
| } catch (const std::exception& e) { |
| LOG_ERROR_NEW("unregister client exception from broker: {}. EXCEPTION: {}", addr, e.what()); |
| } |
| } |
| } |
| } |
| |
| bool MQClientInstance::registerProducer(const std::string& group, MQProducerInner* producer) { |
| if (group.empty()) { |
| return false; |
| } |
| |
| if (!addProducerToTable(group, producer)) { |
| LOG_WARN_NEW("the consumer group[{}] exist already.", group); |
| return false; |
| } |
| |
| LOG_DEBUG_NEW("registerProducer success:{}", group); |
| return true; |
| } |
| |
| void MQClientInstance::unregisterProducer(const std::string& group) { |
| eraseProducerFromTable(group); |
| unregisterClientWithLock(group, null); |
| } |
| |
| void MQClientInstance::rebalanceImmediately() { |
| rebalance_service_->wakeup(); |
| } |
| |
| void MQClientInstance::doRebalance() { |
| LOG_INFO_NEW("the client instance:{} start doRebalance", client_id_); |
| if (getConsumerTableSize() > 0) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| for (auto& it : consumer_table_) { |
| it.second->doRebalance(); |
| } |
| } |
| LOG_INFO_NEW("the client instance [{}] finish doRebalance", client_id_); |
| } |
| |
| void MQClientInstance::doRebalanceByConsumerGroup(const std::string& consumerGroup) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| const auto& it = consumer_table_.find(consumerGroup); |
| if (it != consumer_table_.end()) { |
| try { |
| LOG_INFO_NEW("the client instance [{}] start doRebalance for consumer [{}]", client_id_, consumerGroup); |
| auto* consumer = it->second; |
| consumer->doRebalance(); |
| } catch (const std::exception& e) { |
| LOG_ERROR_NEW("{}", e.what()); |
| } |
| } |
| } |
| |
| MQProducerInner* MQClientInstance::selectProducer(const std::string& producerName) { |
| std::lock_guard<std::mutex> lock(producer_table_mutex_); |
| const auto& it = producer_table_.find(producerName); |
| if (it != producer_table_.end()) { |
| return it->second; |
| } |
| return nullptr; |
| } |
| |
| bool MQClientInstance::addProducerToTable(const std::string& producerName, MQProducerInner* producer) { |
| std::lock_guard<std::mutex> lock(producer_table_mutex_); |
| if (producer_table_.find(producerName) != producer_table_.end()) { |
| return false; |
| } else { |
| producer_table_[producerName] = producer; |
| return true; |
| } |
| } |
| |
| void MQClientInstance::eraseProducerFromTable(const std::string& producerName) { |
| std::lock_guard<std::mutex> lock(producer_table_mutex_); |
| const auto& it = producer_table_.find(producerName); |
| if (it != producer_table_.end()) { |
| producer_table_.erase(it); |
| } |
| } |
| |
| int MQClientInstance::getProducerTableSize() { |
| std::lock_guard<std::mutex> lock(producer_table_mutex_); |
| return producer_table_.size(); |
| } |
| |
| void MQClientInstance::getTopicListFromTopicPublishInfo(std::set<std::string>& topicList) { |
| std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_); |
| for (const auto& it : topic_publish_info_table_) { |
| topicList.insert(it.first); |
| } |
| } |
| |
| void MQClientInstance::updateProducerTopicPublishInfo(const std::string& topic, TopicPublishInfoPtr publishInfo) { |
| addTopicInfoToTable(topic, publishInfo); |
| } |
| |
| MQConsumerInner* MQClientInstance::selectConsumer(const std::string& group) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| const auto& it = consumer_table_.find(group); |
| if (it != consumer_table_.end()) { |
| return it->second; |
| } |
| return nullptr; |
| } |
| |
| bool MQClientInstance::addConsumerToTable(const std::string& consumerName, MQConsumerInner* consumer) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| if (consumer_table_.find(consumerName) != consumer_table_.end()) { |
| return false; |
| } else { |
| consumer_table_[consumerName] = consumer; |
| return true; |
| } |
| } |
| |
| void MQClientInstance::eraseConsumerFromTable(const std::string& consumerName) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| const auto& it = consumer_table_.find(consumerName); |
| if (it != consumer_table_.end()) { |
| consumer_table_.erase(it); // do not need free consumer, as it was allocated by user |
| } else { |
| LOG_WARN_NEW("could not find consumer:{} from table", consumerName); |
| } |
| } |
| |
| int MQClientInstance::getConsumerTableSize() { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| return consumer_table_.size(); |
| } |
| |
| void MQClientInstance::getTopicListFromConsumerSubscription(std::set<std::string>& topicList) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| for (const auto& it : consumer_table_) { |
| std::vector<SubscriptionData> result = it.second->subscriptions(); |
| for (const auto& sd : result) { |
| topicList.insert(sd.topic()); |
| } |
| } |
| } |
| |
| void MQClientInstance::updateConsumerTopicSubscribeInfo(const std::string& topic, |
| std::vector<MQMessageQueue> subscribeInfo) { |
| std::lock_guard<std::mutex> lock(consumer_table_mutex_); |
| for (auto& it : consumer_table_) { |
| it.second->updateTopicSubscribeInfo(topic, subscribeInfo); |
| } |
| } |
| |
| void MQClientInstance::addTopicInfoToTable(const std::string& topic, TopicPublishInfoPtr topicPublishInfo) { |
| std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_); |
| topic_publish_info_table_[topic] = topicPublishInfo; |
| } |
| |
| void MQClientInstance::eraseTopicInfoFromTable(const std::string& topic) { |
| std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_); |
| const auto& it = topic_publish_info_table_.find(topic); |
| if (it != topic_publish_info_table_.end()) { |
| topic_publish_info_table_.erase(it); |
| } |
| } |
| |
| TopicPublishInfoPtr MQClientInstance::getTopicPublishInfoFromTable(const std::string& topic) { |
| std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_); |
| const auto& it = topic_publish_info_table_.find(topic); |
| if (it != topic_publish_info_table_.end()) { |
| return it->second; |
| } |
| return nullptr; |
| } |
| |
| bool MQClientInstance::isTopicInfoValidInTable(const std::string& topic) { |
| std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_); |
| return topic_publish_info_table_.find(topic) != topic_publish_info_table_.end(); |
| } |
| |
| TopicPublishInfoPtr MQClientInstance::tryToFindTopicPublishInfo(const std::string& topic) { |
| auto topicPublishInfo = getTopicPublishInfoFromTable(topic); |
| if (nullptr == topicPublishInfo) { |
| updateTopicRouteInfoFromNameServer(topic); |
| topicPublishInfo = getTopicPublishInfoFromTable(topic); |
| } |
| |
| if (nullptr != topicPublishInfo && topicPublishInfo->ok()) { |
| return topicPublishInfo; |
| } else { |
| LOG_INFO_NEW("updateTopicRouteInfoFromNameServer with default"); |
| updateTopicRouteInfoFromNameServer(topic, true); |
| return getTopicPublishInfoFromTable(topic); |
| } |
| } |
| |
| std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInAdmin(const std::string& brokerName) { |
| BrokerAddrMAP brokerTable(getBrokerAddrTable()); |
| bool found = false; |
| bool slave = false; |
| std::string brokerAddr; |
| |
| const auto& it = brokerTable.find(brokerName); |
| if (it != brokerTable.end()) { |
| const auto& brokerMap = it->second; |
| const auto& it1 = brokerMap.begin(); |
| if (it1 != brokerMap.end()) { |
| slave = (it1->first != MASTER_ID); |
| found = true; |
| brokerAddr = it1->second; |
| } |
| } |
| |
| brokerTable.clear(); |
| if (found) { |
| return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave)); |
| } |
| |
| return nullptr; |
| } |
| |
| std::string MQClientInstance::findBrokerAddressInPublish(const std::string& brokerName) { |
| BrokerAddrMAP brokerTable(getBrokerAddrTable()); |
| std::string brokerAddr; |
| bool found = false; |
| |
| const auto& it = brokerTable.find(brokerName); |
| if (it != brokerTable.end()) { |
| const auto& brokerMap = it->second; |
| const auto& it1 = brokerMap.find(MASTER_ID); |
| if (it1 != brokerMap.end()) { |
| brokerAddr = it1->second; |
| found = true; |
| } |
| } |
| |
| brokerTable.clear(); |
| if (found) { |
| return brokerAddr; |
| } |
| |
| return null; |
| } |
| |
| std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInSubscribe(const std::string& brokerName, |
| int brokerId, |
| bool onlyThisBroker) { |
| std::string brokerAddr; |
| bool slave = false; |
| bool found = false; |
| BrokerAddrMAP brokerTable(getBrokerAddrTable()); |
| |
| const auto& it = brokerTable.find(brokerName); |
| if (it != brokerTable.end()) { |
| const auto& brokerMap = it->second; |
| if (!brokerMap.empty()) { |
| const auto& it1 = brokerMap.find(brokerId); |
| if (it1 != brokerMap.end()) { |
| brokerAddr = it1->second; |
| slave = it1->first != MASTER_ID; |
| found = true; |
| } else if (!onlyThisBroker) { // not only from master |
| const auto& it2 = brokerMap.begin(); |
| brokerAddr = it2->second; |
| slave = it2->first != MASTER_ID; |
| found = true; |
| } |
| } |
| } |
| |
| brokerTable.clear(); |
| |
| if (found) { |
| return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave)); |
| } |
| |
| return nullptr; |
| } |
| |
| void MQClientInstance::findConsumerIds(const std::string& topic, |
| const std::string& group, |
| std::vector<std::string>& cids) { |
| std::string brokerAddr; |
| |
| // find consumerIds from same broker every 40s |
| { |
| std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_); |
| const auto& it = topic_broker_addr_table_.find(topic); |
| if (it != topic_broker_addr_table_.end()) { |
| if (UtilAll::currentTimeMillis() < it->second.second + 120000) { |
| brokerAddr = it->second.first; |
| } |
| } |
| } |
| |
| if (brokerAddr.empty()) { |
| // select new one |
| brokerAddr = findBrokerAddrByTopic(topic); |
| if (brokerAddr.empty()) { |
| updateTopicRouteInfoFromNameServer(topic); |
| brokerAddr = findBrokerAddrByTopic(topic); |
| } |
| |
| if (!brokerAddr.empty()) { |
| std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_); |
| topic_broker_addr_table_[topic] = std::make_pair(brokerAddr, UtilAll::currentTimeMillis()); |
| } |
| } |
| |
| if (!brokerAddr.empty()) { |
| try { |
| LOG_INFO_NEW("getConsumerIdList from broker:{}", brokerAddr); |
| return mq_client_api_impl_->getConsumerIdListByGroup(brokerAddr, group, cids, 5000); |
| } catch (const MQException& e) { |
| LOG_ERROR_NEW("encounter exception when getConsumerIdList: {}", e.what()); |
| |
| std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_); |
| topic_broker_addr_table_.erase(topic); |
| } |
| } |
| } |
| |
| std::string MQClientInstance::findBrokerAddrByTopic(const std::string& topic) { |
| auto topicRouteData = getTopicRouteData(topic); |
| if (topicRouteData != nullptr) { |
| return topicRouteData->selectBrokerAddr(); |
| } |
| return ""; |
| } |
| |
| void MQClientInstance::resetOffset(const std::string& group, |
| const std::string& topic, |
| const std::map<MQMessageQueue, int64_t>& offsetTable) { |
| DefaultMQPushConsumerImpl* consumer = nullptr; |
| try { |
| auto* impl = selectConsumer(group); |
| if (impl != nullptr && std::type_index(typeid(*impl)) == std::type_index(typeid(DefaultMQPushConsumerImpl))) { |
| consumer = static_cast<DefaultMQPushConsumerImpl*>(impl); |
| } else { |
| LOG_INFO_NEW("[reset-offset] consumer dose not exist. group={}", group); |
| return; |
| } |
| consumer->suspend(); |
| |
| auto processQueueTable = consumer->getRebalanceImpl()->getProcessQueueTable(); |
| for (const auto& it : processQueueTable) { |
| const auto& mq = it.first; |
| if (topic == mq.topic() && offsetTable.find(mq) != offsetTable.end()) { |
| auto pq = it.second; |
| pq->set_dropped(true); |
| pq->clearAllMsgs(); |
| } |
| } |
| |
| std::this_thread::sleep_for(std::chrono::seconds(10)); |
| |
| for (const auto& it : processQueueTable) { |
| const auto& mq = it.first; |
| const auto& it2 = offsetTable.find(mq); |
| if (it2 != offsetTable.end()) { |
| auto offset = it2->second; |
| consumer->updateConsumeOffset(mq, offset); |
| consumer->getRebalanceImpl()->removeUnnecessaryMessageQueue(mq, it.second); |
| consumer->getRebalanceImpl()->removeProcessQueueDirectly(mq); |
| } |
| } |
| } catch (...) { |
| if (consumer != nullptr) { |
| consumer->resume(); |
| } |
| throw; |
| } |
| if (consumer != nullptr) { |
| consumer->resume(); |
| } |
| } |
| |
| std::unique_ptr<ConsumerRunningInfo> MQClientInstance::consumerRunningInfo(const std::string& consumerGroup) { |
| auto* consumer = selectConsumer(consumerGroup); |
| if (consumer != nullptr) { |
| std::unique_ptr<ConsumerRunningInfo> runningInfo(consumer->consumerRunningInfo()); |
| if (runningInfo != nullptr) { |
| std::string nsAddr = getNamesrvAddr(); |
| runningInfo->setProperty(ConsumerRunningInfo::PROP_NAMESERVER_ADDR, nsAddr); |
| |
| if (consumer->consumeType() == CONSUME_PASSIVELY) { |
| runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_PASSIVELY"); |
| } else { |
| runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_ACTIVELY"); |
| } |
| |
| runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, |
| MQVersion::GetVersionDesc(MQVersion::CURRENT_VERSION)); |
| |
| return runningInfo; |
| } |
| } |
| |
| LOG_ERROR_NEW("no corresponding consumer found for group:{}", consumerGroup); |
| return nullptr; |
| } |
| |
| void MQClientInstance::addBrokerToAddrTable(const std::string& brokerName, |
| const std::map<int, std::string>& brokerAddrs) { |
| std::lock_guard<std::mutex> lock(broker_addr_table_mutex_); |
| broker_addr_table_[brokerName] = brokerAddrs; |
| } |
| |
| void MQClientInstance::resetBrokerAddrTable(BrokerAddrMAP&& table) { |
| std::lock_guard<std::mutex> lock(broker_addr_table_mutex_); |
| broker_addr_table_ = std::forward<BrokerAddrMAP>(table); |
| } |
| |
| void MQClientInstance::clearBrokerAddrTable() { |
| std::lock_guard<std::mutex> lock(broker_addr_table_mutex_); |
| broker_addr_table_.clear(); |
| } |
| |
| MQClientInstance::BrokerAddrMAP MQClientInstance::getBrokerAddrTable() { |
| std::lock_guard<std::mutex> lock(broker_addr_table_mutex_); |
| return broker_addr_table_; |
| } |
| |
| } // namespace rocketmq |