| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "TopicPublishInfo.h" |
| |
| #include <memory> |
| #include <utility> |
| |
| #include "rocketmq/Logger.h" |
| #include "spdlog/spdlog.h" |
| #include "MixAll.h" |
| #include "ProducerImpl.h" |
| #include "TopicRouteData.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/strings/str_join.h" |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| thread_local uint32_t TopicPublishInfo::send_which_queue_ = MixAll::random(0, 100); |
| |
| TopicPublishInfo::TopicPublishInfo(std::weak_ptr<ProducerImpl> producer, |
| absl::string_view topic, |
| TopicRouteDataPtr topic_route_data) |
| : producer_(std::move(producer)), |
| topic_(topic.data(), topic.length()), |
| topic_route_data_(std::move(topic_route_data)) { |
| updatePublishInfo(); |
| } |
| |
| bool TopicPublishInfo::selectMessageQueues(absl::optional<std::string> message_group, |
| std::vector<rmq::MessageQueue>& result) { |
| { |
| absl::MutexLock lock(&queue_list_mtx_); |
| if (queue_list_.empty()) { |
| SPDLOG_WARN("Failed to screen viable message queues from TopicPublishInfo because there is no candidates at all"); |
| return false; |
| } |
| } |
| |
| auto producer = producer_.lock(); |
| if (!producer) { |
| return false; |
| } |
| |
| // Select a message queue according to given message group, which is used to send FIFO message. |
| if (message_group.has_value()) { |
| std::size_t hash_code = std::hash<std::string>{}(message_group.value()); |
| absl::MutexLock lk(&queue_list_mtx_); |
| std::size_t index = hash_code % queue_list_.size(); |
| result.push_back(queue_list_[index]); |
| return true; |
| } |
| |
| unsigned int index = ++send_which_queue_; |
| // For standard messages, select at most "count" message queues in case retries are required. |
| { |
| absl::MutexLock lock(&queue_list_mtx_); |
| for (std::vector<rmq::MessageQueue>::size_type i = 0; i < queue_list_.size(); ++i) { |
| const rmq::MessageQueue& message_queue = queue_list_[index++ % (queue_list_.size())]; |
| if (!producer->isEndpointIsolated(urlOf(message_queue))) { |
| auto search = std::find_if(result.begin(), result.end(), [&](const rmq::MessageQueue& item) { |
| return item.broker().name() == message_queue.broker().name(); |
| }); |
| if (std::end(result) == search) { |
| result.emplace_back(message_queue); |
| } |
| if (result.size() >= producer->maxAttemptTimes()) { |
| return true; |
| } |
| } |
| } |
| } |
| return !result.empty(); |
| } |
| |
| void TopicPublishInfo::updatePublishInfo() { |
| std::vector<rmq::MessageQueue> writable_queue_list; |
| { |
| for (const auto& queue : topic_route_data_->messageQueues()) { |
| assert(queue.topic().name() == topic_); |
| if (!writable(queue.permission())) { |
| continue; |
| } |
| if (MixAll::MASTER_BROKER_ID != queue.broker().id()) { |
| continue; |
| } |
| writable_queue_list.push_back(queue); |
| } |
| } |
| |
| if (writable_queue_list.empty()) { |
| SPDLOG_WARN("No writable queue is current available. Skip updating publish table for topic={}", topic_); |
| return; |
| } |
| |
| { |
| absl::MutexLock lk(&queue_list_mtx_); |
| queue_list_.swap(writable_queue_list); |
| } |
| } |
| |
| void TopicPublishInfo::topicRouteData(TopicRouteDataPtr topic_route_data) { |
| SPDLOG_DEBUG("Update publish info according to renewed route data of topic={}", topic_); |
| topic_route_data_ = std::move(topic_route_data); |
| updatePublishInfo(); |
| } |
| |
| std::vector<rmq::MessageQueue> TopicPublishInfo::getMessageQueueList() { |
| absl::MutexLock lock(&queue_list_mtx_); |
| return queue_list_; |
| } |
| |
| ROCKETMQ_NAMESPACE_END |