/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "MQClientInstance.h"

#include <typeindex>

#include "ClientRemotingProcessor.h"
#include "Logging.h"
#include "MQAdminImpl.h"
#include "MQClientAPIImpl.h"
#include "MQClientManager.h"
#include "MQVersion.h"
#include "PermName.h"
#include "PullMessageService.hpp"
#include "PullRequest.h"
#include "RebalanceImpl.h"
#include "RebalanceService.h"
#include "TcpRemotingClient.h"
#include "TopicPublishInfo.hpp"
#include "UtilAll.h"
#include "protocol/body/ConsumerRunningInfo.h"

namespace rocketmq {

static const long LOCK_TIMEOUT_MILLIS = 3000L;

MQClientInstance::MQClientInstance(const MQClientConfig& clientConfig, const std::string& clientId)
    : MQClientInstance(clientConfig, clientId, nullptr) {}

MQClientInstance::MQClientInstance(const MQClientConfig& clientConfig, const std::string& clientId, RPCHookPtr rpcHook)
    : client_id_(clientId),
      rebalance_service_(new RebalanceService(this)),
      pull_message_service_(new PullMessageService(this)),
      scheduled_executor_service_("MQClient", false) {
  // default Topic register
  TopicPublishInfoPtr defaultTopicInfo(new TopicPublishInfo());
  topic_publish_info_table_[AUTO_CREATE_TOPIC_KEY_TOPIC] = defaultTopicInfo;

  client_remoting_processor_.reset(new ClientRemotingProcessor(this));
  mq_client_api_impl_.reset(new MQClientAPIImpl(client_remoting_processor_.get(), rpcHook, clientConfig));

  std::string namesrvAddr = clientConfig.namesrv_addr();
  if (!namesrvAddr.empty()) {
    mq_client_api_impl_->updateNameServerAddressList(namesrvAddr);
    LOG_INFO_NEW("user specified name server address: {}", namesrvAddr);
  }

  mq_admin_impl_.reset(new MQAdminImpl(this));

  service_state_ = CREATE_JUST;
  LOG_DEBUG_NEW("MQClientInstance construct");
}

MQClientInstance::~MQClientInstance() {
  LOG_INFO_NEW("MQClientInstance:{} destruct", client_id_);

  // UNNECESSARY:
  producer_table_.clear();
  consumer_table_.clear();
  topic_publish_info_table_.clear();
  topic_route_table_.clear();
  broker_addr_table_.clear();

  mq_client_api_impl_ = nullptr;
}

std::string MQClientInstance::getNamesrvAddr() const {
  auto namesrvAddrs = mq_client_api_impl_->getRemotingClient()->getNameServerAddressList();
  std::ostringstream oss;
  for (const auto& addr : namesrvAddrs) {
    oss << addr << ";";
  }
  return oss.str();
}

TopicPublishInfoPtr MQClientInstance::topicRouteData2TopicPublishInfo(const std::string& topic,
                                                                      TopicRouteDataPtr route) {
  auto info = std::make_shared<TopicPublishInfo>();
  info->setTopicRouteData(route);

  auto& mqList = const_cast<TopicPublishInfo::QueuesVec&>(info->getMessageQueueList());

  std::string orderTopicConf = route->order_topic_conf();
  if (!orderTopicConf.empty()) {  // order msg
    // "broker-a:8";"broker-b:8"
    std::vector<std::string> brokers;
    UtilAll::Split(brokers, orderTopicConf, ';');
    for (const auto& broker : brokers) {
      std::vector<std::string> item;
      UtilAll::Split(item, broker, ':');
      int nums = atoi(item[1].c_str());
      for (int i = 0; i < nums; i++) {
        mqList.emplace_back(topic, item[0], i);
      }
    }
    info->setOrderTopic(true);
  } else {  // no order msg
    const auto& qds = route->queue_datas();
    for (const auto& qd : qds) {
      if (PermName::isWriteable(qd.perm())) {
        const BrokerData* brokerData = nullptr;
        for (const auto& bd : route->broker_datas()) {
          if (bd.broker_name() == qd.broker_name()) {
            brokerData = &bd;
            break;
          }
        }

        if (nullptr == brokerData) {
          LOG_WARN_NEW("MQClientInstance: broker:{} of topic:{} have not data", qd.broker_name(), topic);
          continue;
        }

        if (brokerData->broker_addrs().find(MASTER_ID) == brokerData->broker_addrs().end()) {
          LOG_WARN_NEW("MQClientInstance: broker:{} of topic:{} have not master node", qd.broker_name(), topic);
          continue;
        }

        for (int i = 0; i < qd.write_queue_nums(); i++) {
          mqList.emplace_back(topic, qd.broker_name(), i);
        }
      }
    }

    // sort, make brokerName is staggered.
    std::sort(mqList.begin(), mqList.end(), [](const MQMessageQueue& a, const MQMessageQueue& b) {
      auto result = a.queue_id() - b.queue_id();
      if (result == 0) {
        result = a.broker_name().compare(b.broker_name());
      }
      return result < 0;
    });

    info->setOrderTopic(false);
  }

  return info;
}

std::vector<MQMessageQueue> MQClientInstance::topicRouteData2TopicSubscribeInfo(const std::string& topic,
                                                                                TopicRouteDataPtr route) {
  std::vector<MQMessageQueue> mqList;
  const auto& queueDatas = route->queue_datas();
  for (const auto& qd : queueDatas) {
    if (PermName::isReadable(qd.perm())) {
      for (int i = 0; i < qd.read_queue_nums(); i++) {
        MQMessageQueue mq(topic, qd.broker_name(), i);
        mqList.push_back(mq);
      }
    }
  }
  return mqList;
}

void MQClientInstance::start() {
  switch (service_state_) {
    case CREATE_JUST:
      LOG_INFO_NEW("the client instance [{}] is starting", client_id_);
      service_state_ = START_FAILED;

      mq_client_api_impl_->start();

      // start various schedule tasks
      startScheduledTask();

      // start pull service
      pull_message_service_->start();

      // start rebalance service
      rebalance_service_->start();

      LOG_INFO_NEW("the client instance [{}] start OK", client_id_);
      service_state_ = RUNNING;
      break;
    case RUNNING:
      LOG_INFO_NEW("the client instance [{}] already running.", client_id_, service_state_);
      break;
    case SHUTDOWN_ALREADY:
    case START_FAILED:
      LOG_INFO_NEW("the client instance [{}] start failed with fault state:{}", client_id_, service_state_);
      break;
    default:
      break;
  }
}

void MQClientInstance::shutdown() {
  if (getConsumerTableSize() != 0) {
    return;
  }

  if (getProducerTableSize() != 0) {
    return;
  }

  switch (service_state_) {
    case CREATE_JUST:
      break;
    case RUNNING: {
      service_state_ = SHUTDOWN_ALREADY;
      pull_message_service_->shutdown();
      scheduled_executor_service_.shutdown();
      mq_client_api_impl_->shutdown();
      rebalance_service_->shutdown();

      MQClientManager::getInstance()->removeMQClientInstance(client_id_);
      LOG_INFO_NEW("the client instance [{}] shutdown OK", client_id_);
    } break;
    case SHUTDOWN_ALREADY:
      break;
    default:
      break;
  }
}

bool MQClientInstance::isRunning() {
  return service_state_ == RUNNING;
}

void MQClientInstance::startScheduledTask() {
  LOG_INFO_NEW("start scheduled task:{}", client_id_);
  scheduled_executor_service_.startup();

  // updateTopicRouteInfoFromNameServer
  scheduled_executor_service_.schedule(std::bind(&MQClientInstance::updateTopicRouteInfoPeriodically, this), 10,
                                       time_unit::milliseconds);

  // sendHeartbeatToAllBroker
  scheduled_executor_service_.schedule(std::bind(&MQClientInstance::sendHeartbeatToAllBrokerPeriodically, this), 1000,
                                       time_unit::milliseconds);

  // persistAllConsumerOffset
  scheduled_executor_service_.schedule(std::bind(&MQClientInstance::persistAllConsumerOffsetPeriodically, this),
                                       1000 * 10, time_unit::milliseconds);
}

void MQClientInstance::updateTopicRouteInfoPeriodically() {
  updateTopicRouteInfoFromNameServer();

  // next round
  scheduled_executor_service_.schedule(std::bind(&MQClientInstance::updateTopicRouteInfoPeriodically, this), 1000 * 30,
                                       time_unit::milliseconds);
}

void MQClientInstance::sendHeartbeatToAllBrokerPeriodically() {
  cleanOfflineBroker();
  sendHeartbeatToAllBrokerWithLock();

  // next round
  scheduled_executor_service_.schedule(std::bind(&MQClientInstance::sendHeartbeatToAllBrokerPeriodically, this),
                                       1000 * 30, time_unit::milliseconds);
}

void MQClientInstance::persistAllConsumerOffsetPeriodically() {
  persistAllConsumerOffset();

  // next round
  scheduled_executor_service_.schedule(std::bind(&MQClientInstance::persistAllConsumerOffsetPeriodically, this),
                                       1000 * 5, time_unit::milliseconds);
}

const std::string& MQClientInstance::getClientId() const {
  return client_id_;
}

void MQClientInstance::updateTopicRouteInfoFromNameServer() {
  std::set<std::string> topicList;

  // Consumer
  getTopicListFromConsumerSubscription(topicList);

  // Producer
  getTopicListFromTopicPublishInfo(topicList);

  // update
  if (!topicList.empty()) {
    for (const auto& topic : topicList) {
      updateTopicRouteInfoFromNameServer(topic);
    }
  }
}

void MQClientInstance::cleanOfflineBroker() {
  if (UtilAll::try_lock_for(lock_namesrv_, LOCK_TIMEOUT_MILLIS)) {
    std::lock_guard<std::timed_mutex> lock(lock_namesrv_, std::adopt_lock);

    std::set<std::string> offlineBrokers;
    BrokerAddrMAP updatedTable(getBrokerAddrTable());
    for (auto itBrokerTable = updatedTable.begin(); itBrokerTable != updatedTable.end();) {
      const auto& brokerName = itBrokerTable->first;
      auto& cloneAddrTable = itBrokerTable->second;

      for (auto it = cloneAddrTable.begin(); it != cloneAddrTable.end();) {
        const auto& addr = it->second;
        if (!isBrokerAddrExistInTopicRouteTable(addr)) {
          offlineBrokers.insert(addr);
          it = cloneAddrTable.erase(it);
          LOG_INFO_NEW("the broker addr[{} {}] is offline, remove it", brokerName, addr);
        } else {
          it++;
        }
      }

      if (cloneAddrTable.empty()) {
        itBrokerTable = updatedTable.erase(itBrokerTable);
        LOG_INFO_NEW("the broker[{}] name's host is offline, remove it", brokerName);
      } else {
        itBrokerTable++;
      }
    }

    if (offlineBrokers.size() > 0) {
      resetBrokerAddrTable(std::move(updatedTable));

      std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_);
      for (auto it = topic_broker_addr_table_.begin(); it != topic_broker_addr_table_.end();) {
        if (offlineBrokers.find(it->second.first) != offlineBrokers.end()) {
          it = topic_broker_addr_table_.erase(it);
        } else {
          it++;
        }
      }
    }
  } else {
    LOG_WARN_NEW("lock namesrv, but failed.");
  }
}

bool MQClientInstance::isBrokerAddrExistInTopicRouteTable(const std::string& addr) {
  std::lock_guard<std::mutex> lock(topic_route_table_mutex_);
  for (const auto& it : topic_route_table_) {
    const auto topicRouteData = it.second;
    const auto& bds = topicRouteData->broker_datas();
    for (const auto& bd : bds) {
      for (const auto& itAddr : bd.broker_addrs()) {
        if (itAddr.second == addr) {
          return true;
        }
      }
    }
  }
  return false;
}

void MQClientInstance::sendHeartbeatToAllBrokerWithLock() {
  if (lock_heartbeat_.try_lock()) {
    std::lock_guard<std::timed_mutex> lock(lock_heartbeat_, std::adopt_lock);
    sendHeartbeatToAllBroker();
  } else {
    LOG_WARN_NEW("lock heartBeat, but failed.");
  }
}

void MQClientInstance::persistAllConsumerOffset() {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  for (const auto& it : consumer_table_) {
    LOG_DEBUG_NEW("the client instance [{}] start persistAllConsumerOffset", client_id_);
    it.second->persistConsumerOffset();
  }
}

void MQClientInstance::sendHeartbeatToAllBroker() {
  std::unique_ptr<HeartbeatData> heartbeatData(prepareHeartbeatData());
  bool producerEmpty = heartbeatData->producer_data_set().empty();
  bool consumerEmpty = heartbeatData->consumer_data_set().empty();
  if (producerEmpty && consumerEmpty) {
    LOG_WARN_NEW("sending heartbeat, but no consumer and no producer");
    return;
  }

  auto brokerAddrTable = getBrokerAddrTable();
  if (!brokerAddrTable.empty()) {
    for (const auto& it : brokerAddrTable) {
      // const auto& brokerName = it.first;
      const auto& oneTable = it.second;
      for (const auto& it2 : oneTable) {
        const auto id = it2.first;
        const auto& addr = it2.second;
        if (consumerEmpty && id != MASTER_ID) {
          continue;
        }

        try {
          mq_client_api_impl_->sendHearbeat(addr, heartbeatData.get(), 3000);
        } catch (const MQException& e) {
          LOG_ERROR_NEW("{}", e.what());
        }
      }
    }
    brokerAddrTable.clear();
  } else {
    LOG_WARN_NEW("sendheartbeat brokerAddrTable is empty");
  }
}

bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& topic, bool isDefault) {
  if (UtilAll::try_lock_for(lock_namesrv_, LOCK_TIMEOUT_MILLIS)) {
    std::lock_guard<std::timed_mutex> lock(lock_namesrv_, std::adopt_lock);
    LOG_DEBUG_NEW("updateTopicRouteInfoFromNameServer start:{}", topic);

    try {
      TopicRouteDataPtr topicRouteData;
      if (isDefault) {
        topicRouteData = mq_client_api_impl_->getTopicRouteInfoFromNameServer(AUTO_CREATE_TOPIC_KEY_TOPIC, 1000 * 3);
        if (topicRouteData != nullptr) {
          auto& queueDatas = topicRouteData->queue_datas();
          for (auto& qd : queueDatas) {
            int queueNums = std::min(4, qd.read_queue_nums());
            qd.set_read_queue_nums(queueNums);
            qd.set_write_queue_nums(queueNums);
          }
        }
        LOG_DEBUG_NEW("getTopicRouteInfoFromNameServer is null for topic: {}", topic);
      } else {
        topicRouteData = mq_client_api_impl_->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
      }
      if (topicRouteData != nullptr) {
        LOG_INFO_NEW("updateTopicRouteInfoFromNameServer has data");
        auto old = getTopicRouteData(topic);
        bool changed = topicRouteDataIsChange(old.get(), topicRouteData.get());

        if (changed) {
          LOG_INFO_NEW("updateTopicRouteInfoFromNameServer changed:{}", topic);

          // update broker addr
          const auto& brokerDatas = topicRouteData->broker_datas();
          for (const auto& bd : brokerDatas) {
            LOG_INFO_NEW("updateTopicRouteInfoFromNameServer changed with broker name:{}", bd.broker_name());
            addBrokerToAddrTable(bd.broker_name(), bd.broker_addrs());
          }

          // update publish info
          {
            TopicPublishInfoPtr publishInfo(topicRouteData2TopicPublishInfo(topic, topicRouteData));
            updateProducerTopicPublishInfo(topic, publishInfo);
          }

          // update subscribe info
          if (getConsumerTableSize() > 0) {
            std::vector<MQMessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
            updateConsumerTopicSubscribeInfo(topic, subscribeInfo);
          }

          addTopicRouteData(topic, topicRouteData);
        }

        LOG_DEBUG_NEW("updateTopicRouteInfoFromNameServer end:{}", topic);
        return true;
      } else {
        LOG_WARN_NEW("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}",
                     topic);
      }
    } catch (const std::exception& e) {
      if (!UtilAll::isRetryTopic(topic) && topic != AUTO_CREATE_TOPIC_KEY_TOPIC) {
        LOG_WARN_NEW("updateTopicRouteInfoFromNameServer Exception, {}", e.what());
      }
    }
  } else {
    LOG_WARN_NEW("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
  }

  return false;
}

std::unique_ptr<HeartbeatData> MQClientInstance::prepareHeartbeatData() {
  std::unique_ptr<HeartbeatData> heartbeat_data(new HeartbeatData());

  // clientID
  heartbeat_data->set_client_id(client_id_);

  // Consumer
  insertConsumerInfoToHeartBeatData(heartbeat_data.get());

  // Producer
  insertProducerInfoToHeartBeatData(heartbeat_data.get());

  return heartbeat_data;
}

void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  for (const auto& it : consumer_table_) {
    const auto* consumer = it.second;
    // TODO: unitMode
    heartbeatData->consumer_data_set().emplace_back(consumer->groupName(), consumer->consumeType(),
                                                    consumer->messageModel(), consumer->consumeFromWhere(),
                                                    consumer->subscriptions());
  }
}

void MQClientInstance::insertProducerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
  std::lock_guard<std::mutex> lock(producer_table_mutex_);
  for (const auto& it : producer_table_) {
    heartbeatData->producer_data_set().emplace_back(it.first);
  }
}

bool MQClientInstance::topicRouteDataIsChange(TopicRouteData* olddata, TopicRouteData* nowdata) {
  if (olddata == nullptr || nowdata == nullptr) {
    return true;
  }
  return !(*olddata == *nowdata);
}

TopicRouteDataPtr MQClientInstance::getTopicRouteData(const std::string& topic) {
  std::lock_guard<std::mutex> lock(topic_route_table_mutex_);
  const auto& it = topic_route_table_.find(topic);
  if (it != topic_route_table_.end()) {
    return it->second;
  }
  return nullptr;
}

void MQClientInstance::addTopicRouteData(const std::string& topic, TopicRouteDataPtr topicRouteData) {
  std::lock_guard<std::mutex> lock(topic_route_table_mutex_);
  topic_route_table_[topic] = topicRouteData;
}

bool MQClientInstance::registerConsumer(const std::string& group, MQConsumerInner* consumer) {
  if (group.empty()) {
    return false;
  }

  if (!addConsumerToTable(group, consumer)) {
    LOG_WARN_NEW("the consumer group[{}] exist already.", group);
    return false;
  }

  LOG_DEBUG_NEW("registerConsumer success:{}", group);
  return true;
}

void MQClientInstance::unregisterConsumer(const std::string& group) {
  eraseConsumerFromTable(group);
  unregisterClientWithLock(null, group);
}

void MQClientInstance::unregisterClientWithLock(const std::string& producerGroup, const std::string& consumerGroup) {
  if (UtilAll::try_lock_for(lock_heartbeat_, LOCK_TIMEOUT_MILLIS)) {
    std::lock_guard<std::timed_mutex> lock(lock_heartbeat_, std::adopt_lock);

    try {
      unregisterClient(producerGroup, consumerGroup);
    } catch (const std::exception& e) {
      LOG_ERROR_NEW("unregisterClient exception: {}", e.what());
    }
  } else {
    LOG_WARN_NEW("lock heartBeat, but failed.");
  }
}

void MQClientInstance::unregisterClient(const std::string& producerGroup, const std::string& consumerGroup) {
  BrokerAddrMAP brokerAddrTable(getBrokerAddrTable());
  for (const auto& it : brokerAddrTable) {
    const auto& brokerName = it.first;
    const auto& oneTable = it.second;
    for (const auto& it2 : oneTable) {
      const auto& index = it2.first;
      const auto& addr = it2.second;
      try {
        mq_client_api_impl_->unregisterClient(addr, client_id_, producerGroup, consumerGroup);
        LOG_INFO_NEW("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup,
                     consumerGroup, brokerName, index, addr);
      } catch (const std::exception& e) {
        LOG_ERROR_NEW("unregister client exception from broker: {}. EXCEPTION: {}", addr, e.what());
      }
    }
  }
}

bool MQClientInstance::registerProducer(const std::string& group, MQProducerInner* producer) {
  if (group.empty()) {
    return false;
  }

  if (!addProducerToTable(group, producer)) {
    LOG_WARN_NEW("the consumer group[{}] exist already.", group);
    return false;
  }

  LOG_DEBUG_NEW("registerProducer success:{}", group);
  return true;
}

void MQClientInstance::unregisterProducer(const std::string& group) {
  eraseProducerFromTable(group);
  unregisterClientWithLock(group, null);
}

void MQClientInstance::rebalanceImmediately() {
  rebalance_service_->wakeup();
}

void MQClientInstance::doRebalance() {
  LOG_INFO_NEW("the client instance:{} start doRebalance", client_id_);
  if (getConsumerTableSize() > 0) {
    std::lock_guard<std::mutex> lock(consumer_table_mutex_);
    for (auto& it : consumer_table_) {
      it.second->doRebalance();
    }
  }
  LOG_INFO_NEW("the client instance [{}] finish doRebalance", client_id_);
}

void MQClientInstance::doRebalanceByConsumerGroup(const std::string& consumerGroup) {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  const auto& it = consumer_table_.find(consumerGroup);
  if (it != consumer_table_.end()) {
    try {
      LOG_INFO_NEW("the client instance [{}] start doRebalance for consumer [{}]", client_id_, consumerGroup);
      auto* consumer = it->second;
      consumer->doRebalance();
    } catch (const std::exception& e) {
      LOG_ERROR_NEW("{}", e.what());
    }
  }
}

MQProducerInner* MQClientInstance::selectProducer(const std::string& producerName) {
  std::lock_guard<std::mutex> lock(producer_table_mutex_);
  const auto& it = producer_table_.find(producerName);
  if (it != producer_table_.end()) {
    return it->second;
  }
  return nullptr;
}

bool MQClientInstance::addProducerToTable(const std::string& producerName, MQProducerInner* producer) {
  std::lock_guard<std::mutex> lock(producer_table_mutex_);
  if (producer_table_.find(producerName) != producer_table_.end()) {
    return false;
  } else {
    producer_table_[producerName] = producer;
    return true;
  }
}

void MQClientInstance::eraseProducerFromTable(const std::string& producerName) {
  std::lock_guard<std::mutex> lock(producer_table_mutex_);
  const auto& it = producer_table_.find(producerName);
  if (it != producer_table_.end()) {
    producer_table_.erase(it);
  }
}

int MQClientInstance::getProducerTableSize() {
  std::lock_guard<std::mutex> lock(producer_table_mutex_);
  return producer_table_.size();
}

void MQClientInstance::getTopicListFromTopicPublishInfo(std::set<std::string>& topicList) {
  std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_);
  for (const auto& it : topic_publish_info_table_) {
    topicList.insert(it.first);
  }
}

void MQClientInstance::updateProducerTopicPublishInfo(const std::string& topic, TopicPublishInfoPtr publishInfo) {
  addTopicInfoToTable(topic, publishInfo);
}

MQConsumerInner* MQClientInstance::selectConsumer(const std::string& group) {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  const auto& it = consumer_table_.find(group);
  if (it != consumer_table_.end()) {
    return it->second;
  }
  return nullptr;
}

bool MQClientInstance::addConsumerToTable(const std::string& consumerName, MQConsumerInner* consumer) {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  if (consumer_table_.find(consumerName) != consumer_table_.end()) {
    return false;
  } else {
    consumer_table_[consumerName] = consumer;
    return true;
  }
}

void MQClientInstance::eraseConsumerFromTable(const std::string& consumerName) {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  const auto& it = consumer_table_.find(consumerName);
  if (it != consumer_table_.end()) {
    consumer_table_.erase(it);  // do not need free consumer, as it was allocated by user
  } else {
    LOG_WARN_NEW("could not find consumer:{} from table", consumerName);
  }
}

int MQClientInstance::getConsumerTableSize() {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  return consumer_table_.size();
}

void MQClientInstance::getTopicListFromConsumerSubscription(std::set<std::string>& topicList) {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  for (const auto& it : consumer_table_) {
    std::vector<SubscriptionData> result = it.second->subscriptions();
    for (const auto& sd : result) {
      topicList.insert(sd.topic());
    }
  }
}

void MQClientInstance::updateConsumerTopicSubscribeInfo(const std::string& topic,
                                                        std::vector<MQMessageQueue> subscribeInfo) {
  std::lock_guard<std::mutex> lock(consumer_table_mutex_);
  for (auto& it : consumer_table_) {
    it.second->updateTopicSubscribeInfo(topic, subscribeInfo);
  }
}

void MQClientInstance::addTopicInfoToTable(const std::string& topic, TopicPublishInfoPtr topicPublishInfo) {
  std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_);
  topic_publish_info_table_[topic] = topicPublishInfo;
}

void MQClientInstance::eraseTopicInfoFromTable(const std::string& topic) {
  std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_);
  const auto& it = topic_publish_info_table_.find(topic);
  if (it != topic_publish_info_table_.end()) {
    topic_publish_info_table_.erase(it);
  }
}

TopicPublishInfoPtr MQClientInstance::getTopicPublishInfoFromTable(const std::string& topic) {
  std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_);
  const auto& it = topic_publish_info_table_.find(topic);
  if (it != topic_publish_info_table_.end()) {
    return it->second;
  }
  return nullptr;
}

bool MQClientInstance::isTopicInfoValidInTable(const std::string& topic) {
  std::lock_guard<std::mutex> lock(topic_publish_info_table_mutex_);
  return topic_publish_info_table_.find(topic) != topic_publish_info_table_.end();
}

TopicPublishInfoPtr MQClientInstance::tryToFindTopicPublishInfo(const std::string& topic) {
  auto topicPublishInfo = getTopicPublishInfoFromTable(topic);
  if (nullptr == topicPublishInfo) {
    updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = getTopicPublishInfoFromTable(topic);
  }

  if (nullptr != topicPublishInfo && topicPublishInfo->ok()) {
    return topicPublishInfo;
  } else {
    LOG_INFO_NEW("updateTopicRouteInfoFromNameServer with default");
    updateTopicRouteInfoFromNameServer(topic, true);
    return getTopicPublishInfoFromTable(topic);
  }
}

std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInAdmin(const std::string& brokerName) {
  BrokerAddrMAP brokerTable(getBrokerAddrTable());
  bool found = false;
  bool slave = false;
  std::string brokerAddr;

  const auto& it = brokerTable.find(brokerName);
  if (it != brokerTable.end()) {
    const auto& brokerMap = it->second;
    const auto& it1 = brokerMap.begin();
    if (it1 != brokerMap.end()) {
      slave = (it1->first != MASTER_ID);
      found = true;
      brokerAddr = it1->second;
    }
  }

  brokerTable.clear();
  if (found) {
    return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave));
  }

  return nullptr;
}

std::string MQClientInstance::findBrokerAddressInPublish(const std::string& brokerName) {
  BrokerAddrMAP brokerTable(getBrokerAddrTable());
  std::string brokerAddr;
  bool found = false;

  const auto& it = brokerTable.find(brokerName);
  if (it != brokerTable.end()) {
    const auto& brokerMap = it->second;
    const auto& it1 = brokerMap.find(MASTER_ID);
    if (it1 != brokerMap.end()) {
      brokerAddr = it1->second;
      found = true;
    }
  }

  brokerTable.clear();
  if (found) {
    return brokerAddr;
  }

  return null;
}

std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInSubscribe(const std::string& brokerName,
                                                                                 int brokerId,
                                                                                 bool onlyThisBroker) {
  std::string brokerAddr;
  bool slave = false;
  bool found = false;
  BrokerAddrMAP brokerTable(getBrokerAddrTable());

  const auto& it = brokerTable.find(brokerName);
  if (it != brokerTable.end()) {
    const auto& brokerMap = it->second;
    if (!brokerMap.empty()) {
      const auto& it1 = brokerMap.find(brokerId);
      if (it1 != brokerMap.end()) {
        brokerAddr = it1->second;
        slave = it1->first != MASTER_ID;
        found = true;
      } else if (!onlyThisBroker) {  // not only from master
        const auto& it2 = brokerMap.begin();
        brokerAddr = it2->second;
        slave = it2->first != MASTER_ID;
        found = true;
      }
    }
  }

  brokerTable.clear();

  if (found) {
    return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave));
  }

  return nullptr;
}

void MQClientInstance::findConsumerIds(const std::string& topic,
                                       const std::string& group,
                                       std::vector<std::string>& cids) {
  std::string brokerAddr;

  // find consumerIds from same broker every 40s
  {
    std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_);
    const auto& it = topic_broker_addr_table_.find(topic);
    if (it != topic_broker_addr_table_.end()) {
      if (UtilAll::currentTimeMillis() < it->second.second + 120000) {
        brokerAddr = it->second.first;
      }
    }
  }

  if (brokerAddr.empty()) {
    // select new one
    brokerAddr = findBrokerAddrByTopic(topic);
    if (brokerAddr.empty()) {
      updateTopicRouteInfoFromNameServer(topic);
      brokerAddr = findBrokerAddrByTopic(topic);
    }

    if (!brokerAddr.empty()) {
      std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_);
      topic_broker_addr_table_[topic] = std::make_pair(brokerAddr, UtilAll::currentTimeMillis());
    }
  }

  if (!brokerAddr.empty()) {
    try {
      LOG_INFO_NEW("getConsumerIdList from broker:{}", brokerAddr);
      return mq_client_api_impl_->getConsumerIdListByGroup(brokerAddr, group, cids, 5000);
    } catch (const MQException& e) {
      LOG_ERROR_NEW("encounter exception when getConsumerIdList: {}", e.what());

      std::lock_guard<std::mutex> lock(topic_broker_addr_table_mutex_);
      topic_broker_addr_table_.erase(topic);
    }
  }
}

std::string MQClientInstance::findBrokerAddrByTopic(const std::string& topic) {
  auto topicRouteData = getTopicRouteData(topic);
  if (topicRouteData != nullptr) {
    return topicRouteData->selectBrokerAddr();
  }
  return "";
}

void MQClientInstance::resetOffset(const std::string& group,
                                   const std::string& topic,
                                   const std::map<MQMessageQueue, int64_t>& offsetTable) {
  DefaultMQPushConsumerImpl* consumer = nullptr;
  try {
    auto* impl = selectConsumer(group);
    if (impl != nullptr && std::type_index(typeid(*impl)) == std::type_index(typeid(DefaultMQPushConsumerImpl))) {
      consumer = static_cast<DefaultMQPushConsumerImpl*>(impl);
    } else {
      LOG_INFO_NEW("[reset-offset] consumer dose not exist. group={}", group);
      return;
    }
    consumer->suspend();

    auto processQueueTable = consumer->getRebalanceImpl()->getProcessQueueTable();
    for (const auto& it : processQueueTable) {
      const auto& mq = it.first;
      if (topic == mq.topic() && offsetTable.find(mq) != offsetTable.end()) {
        auto pq = it.second;
        pq->set_dropped(true);
        pq->clearAllMsgs();
      }
    }

    std::this_thread::sleep_for(std::chrono::seconds(10));

    for (const auto& it : processQueueTable) {
      const auto& mq = it.first;
      const auto& it2 = offsetTable.find(mq);
      if (it2 != offsetTable.end()) {
        auto offset = it2->second;
        consumer->updateConsumeOffset(mq, offset);
        consumer->getRebalanceImpl()->removeUnnecessaryMessageQueue(mq, it.second);
        consumer->getRebalanceImpl()->removeProcessQueueDirectly(mq);
      }
    }
  } catch (...) {
    if (consumer != nullptr) {
      consumer->resume();
    }
    throw;
  }
  if (consumer != nullptr) {
    consumer->resume();
  }
}

std::unique_ptr<ConsumerRunningInfo> MQClientInstance::consumerRunningInfo(const std::string& consumerGroup) {
  auto* consumer = selectConsumer(consumerGroup);
  if (consumer != nullptr) {
    std::unique_ptr<ConsumerRunningInfo> runningInfo(consumer->consumerRunningInfo());
    if (runningInfo != nullptr) {
      std::string nsAddr = getNamesrvAddr();
      runningInfo->setProperty(ConsumerRunningInfo::PROP_NAMESERVER_ADDR, nsAddr);

      if (consumer->consumeType() == CONSUME_PASSIVELY) {
        runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_PASSIVELY");
      } else {
        runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_ACTIVELY");
      }

      runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION,
                               MQVersion::GetVersionDesc(MQVersion::CURRENT_VERSION));

      return runningInfo;
    }
  }

  LOG_ERROR_NEW("no corresponding consumer found for group:{}", consumerGroup);
  return nullptr;
}

void MQClientInstance::addBrokerToAddrTable(const std::string& brokerName,
                                            const std::map<int, std::string>& brokerAddrs) {
  std::lock_guard<std::mutex> lock(broker_addr_table_mutex_);
  broker_addr_table_[brokerName] = brokerAddrs;
}

void MQClientInstance::resetBrokerAddrTable(BrokerAddrMAP&& table) {
  std::lock_guard<std::mutex> lock(broker_addr_table_mutex_);
  broker_addr_table_ = std::forward<BrokerAddrMAP>(table);
}

void MQClientInstance::clearBrokerAddrTable() {
  std::lock_guard<std::mutex> lock(broker_addr_table_mutex_);
  broker_addr_table_.clear();
}

MQClientInstance::BrokerAddrMAP MQClientInstance::getBrokerAddrTable() {
  std::lock_guard<std::mutex> lock(broker_addr_table_mutex_);
  return broker_addr_table_;
}

}  // namespace rocketmq
