| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| |
| #include <apache/rocketmq/v1/definition.pb.h> |
| #include <atomic> |
| #include <chrono> |
| #include <cstdlib> |
| #include <memory> |
| #include <set> |
| |
| #include "Assignment.h" |
| #include "ClientManager.h" |
| #include "FilterExpression.h" |
| #include "MixAll.h" |
| #include "ProcessQueue.h" |
| #include "ReceiveMessageCallback.h" |
| #include "TopicAssignmentInfo.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/container/flat_hash_set.h" |
| #include "apache/rocketmq/v1/service.pb.h" |
| #include "rocketmq/ConsumeType.h" |
| #include "rocketmq/MQMessageExt.h" |
| #include "rocketmq/MQMessageQueue.h" |
| #include "gtest/gtest_prod.h" |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| struct OffsetRecord { |
| explicit OffsetRecord(int64_t offset) : offset_(offset), released_(false) { |
| } |
| OffsetRecord(int64_t offset, bool released) : offset_(offset), released_(released) { |
| } |
| int64_t offset_; |
| bool released_; |
| }; |
| |
| ROCKETMQ_NAMESPACE_END |
| |
| namespace std { |
| |
| template <> |
| struct less<ROCKETMQ_NAMESPACE::OffsetRecord> { |
| bool operator()(const ROCKETMQ_NAMESPACE::OffsetRecord& lhs, const ROCKETMQ_NAMESPACE::OffsetRecord& rhs) const { |
| return lhs.offset_ < rhs.offset_; |
| } |
| }; |
| |
| } // namespace std |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| class PushConsumer; |
| |
| /** |
| * @brief Once messages are fetched(either pulled or popped) from remote server, they are firstly put into cache. |
| * Dispatcher thread, after waking up, will submit them into thread-pool. Messages at this phase are called "inflight" |
| * state. Once messages are processed by user-passed-in callback, their quota will be released for future incoming |
| * messages. |
| */ |
| class ProcessQueueImpl : virtual public ProcessQueue { |
| public: |
| ProcessQueueImpl(MQMessageQueue message_queue, FilterExpression filter_expression, |
| std::weak_ptr<PushConsumer> consumer, std::shared_ptr<ClientManager> client_instance); |
| |
| ~ProcessQueueImpl() override; |
| |
| void callback(std::shared_ptr<ReceiveMessageCallback> callback) override; |
| |
| MQMessageQueue getMQMessageQueue() override; |
| |
| bool expired() const override; |
| |
| bool shouldThrottle() const override LOCKS_EXCLUDED(messages_mtx_); |
| |
| const FilterExpression& getFilterExpression() const override; |
| |
| std::weak_ptr<PushConsumer> getConsumer() override; |
| |
| std::shared_ptr<ClientManager> getClientManager() override; |
| |
| void receiveMessage() override; |
| |
| const std::string& simpleName() const override { |
| return simple_name_; |
| } |
| |
| std::string topic() const override { |
| return message_queue_.getTopic(); |
| } |
| |
| bool hasPendingMessages() const override LOCKS_EXCLUDED(messages_mtx_); |
| |
| /** |
| * Put message fetched from broker into cache. |
| * |
| * @param messages |
| */ |
| void cacheMessages(const std::vector<MQMessageExt>& messages) override LOCKS_EXCLUDED(messages_mtx_, offsets_mtx_); |
| |
| /** |
| * @return Number of messages that is not yet dispatched to thread pool, likely, due to topic-rate-limiting. |
| */ |
| uint32_t cachedMessagesSize() const LOCKS_EXCLUDED(messages_mtx_) { |
| absl::MutexLock lk(&messages_mtx_); |
| return cached_messages_.size(); |
| } |
| |
| /** |
| * Dispatch messages from cache to thread pool in form of consumeTask. |
| * @param batch_size |
| * @param messages |
| * @return true if there are more messages to consume in cache |
| */ |
| bool take(uint32_t batch_size, std::vector<MQMessageExt>& messages) override LOCKS_EXCLUDED(messages_mtx_); |
| |
| void syncIdleState() override { |
| idle_since_ = std::chrono::steady_clock::now(); |
| } |
| |
| void nextOffset(int64_t next_offset) override { |
| assert(next_offset >= 0); |
| next_offset_ = next_offset; |
| } |
| |
| int64_t nextOffset() const { |
| return next_offset_; |
| } |
| |
| bool committedOffset(int64_t& offset) override LOCKS_EXCLUDED(offsets_mtx_); |
| |
| void release(uint64_t body_size, int64_t offset) override LOCKS_EXCLUDED(messages_mtx_, offsets_mtx_); |
| |
| bool unbindFifoConsumeTask() override { |
| bool expected = true; |
| return has_fifo_task_bound_.compare_exchange_strong(expected, false, std::memory_order_relaxed); |
| } |
| |
| bool bindFifoConsumeTask() override { |
| bool expected = false; |
| return has_fifo_task_bound_.compare_exchange_strong(expected, true, std::memory_order_relaxed); |
| } |
| |
| private: |
| MQMessageQueue message_queue_; |
| |
| /** |
| * Expression used to filter message in the server side. |
| */ |
| const FilterExpression filter_expression_; |
| |
| std::chrono::milliseconds invisible_time_; |
| |
| std::chrono::steady_clock::time_point idle_since_{std::chrono::steady_clock::now()}; |
| |
| absl::Time create_timestamp_{absl::Now()}; |
| |
| std::string simple_name_; |
| |
| std::weak_ptr<PushConsumer> consumer_; |
| std::shared_ptr<ClientManager> client_manager_; |
| |
| std::shared_ptr<ReceiveMessageCallback> receive_callback_; |
| |
| /** |
| * Messages that are pending to be submitted to thread pool. |
| */ |
| mutable std::vector<MQMessageExt> cached_messages_ GUARDED_BY(messages_mtx_); |
| |
| mutable absl::Mutex messages_mtx_; |
| |
| /** |
| * @brief Quantity of the cached messages. |
| * |
| */ |
| std::atomic<uint32_t> cached_message_quantity_; |
| |
| /** |
| * @brief Total body memory size of the cached messages. |
| * |
| */ |
| std::atomic<uint64_t> cached_message_memory_; |
| |
| int64_t next_offset_{0}; |
| |
| /** |
| * If this process queue is used in FIFO scenario, this field marks if there is an task in thread pool. |
| */ |
| std::atomic_bool has_fifo_task_bound_{false}; |
| |
| std::set<OffsetRecord> offsets_ GUARDED_BY(offsets_mtx_); |
| absl::Mutex offsets_mtx_; |
| |
| void popMessage(); |
| void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata, |
| rmq::ReceiveMessageRequest& request); |
| |
| void pullMessage(); |
| void wrapPullMessageRequest(rmq::PullMessageRequest& request); |
| |
| void wrapFilterExpression(rmq::FilterExpression* filter_expression); |
| |
| FRIEND_TEST(ProcessQueueTest, testExpired); |
| }; |
| |
| ROCKETMQ_NAMESPACE_END |