| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <system_error> |
| |
| #include "absl/strings/string_view.h" |
| |
| #include "ClientConfigImpl.h" |
| #include "ClientImpl.h" |
| #include "ClientManagerImpl.h" |
| #include "ConsumeMessageService.h" |
| #include "FilterExpression.h" |
| #include "ProcessQueue.h" |
| #include "PushConsumer.h" |
| #include "Scheduler.h" |
| #include "TopicAssignmentInfo.h" |
| #include "TopicPublishInfo.h" |
| #include "UtilAll.h" |
| #include "apache/rocketmq/v1/service.pb.h" |
| #include "rocketmq/DefaultMQPushConsumer.h" |
| #include "rocketmq/OffsetStore.h" |
| #include "rocketmq/State.h" |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| class ConsumeMessageService; |
| class ConsumeFifoMessageService; |
| class ConsumeStandardMessageService; |
| |
| class PushConsumerImpl : virtual public ClientImpl, |
| virtual public PushConsumer, |
| public std::enable_shared_from_this<PushConsumerImpl> { |
| public: |
| explicit PushConsumerImpl(absl::string_view group_name); |
| |
| ~PushConsumerImpl() override; |
| |
| void prepareHeartbeatData(HeartbeatRequest& request) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); |
| |
| void start() override; |
| |
| void shutdown() override; |
| |
| void subscribe(const std::string& topic, const std::string& expression, |
| ExpressionType expression_type = ExpressionType::TAG) |
| LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); |
| |
| void unsubscribe(const std::string& topic) LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); |
| |
| absl::optional<FilterExpression> getFilterExpression(const std::string& topic) const override |
| LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); |
| |
| void setConsumeFromWhere(ConsumeFromWhere consume_from_where); |
| |
| void registerMessageListener(MessageListener* message_listener); |
| |
| void scanAssignments() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); |
| |
| static bool selectBroker(const TopicRouteDataPtr& route, std::string& broker_host); |
| |
| void wrapQueryAssignmentRequest(const std::string& topic, const std::string& consumer_group, |
| const std::string& client_id, const std::string& strategy_name, |
| QueryAssignmentRequest& request); |
| |
| /** |
| * Query assignment of the specified topic from load balancer directly if |
| * message consuming mode is clustering. In case current client is operating |
| * in the broadcasting mode, assignments are constructed locally from topic |
| * route entries. |
| * |
| * @param topic Topic to query |
| * @return shared pointer to topic assignment info |
| */ |
| void queryAssignment(const std::string& topic, |
| const std::function<void(const std::error_code&, const TopicAssignmentPtr&)>& cb); |
| |
| void syncProcessQueue(const std::string& topic, const TopicAssignmentPtr& topic_assignment, |
| const FilterExpression& filter_expression) LOCKS_EXCLUDED(process_queue_table_mtx_); |
| |
| ProcessQueueSharedPtr getOrCreateProcessQueue(const MQMessageQueue& message_queue, |
| const FilterExpression& filter_expression) |
| LOCKS_EXCLUDED(process_queue_table_mtx_); |
| |
| bool receiveMessage(const MQMessageQueue& message_queue, const FilterExpression& filter_expression) override |
| LOCKS_EXCLUDED(process_queue_table_mtx_); |
| |
| uint32_t consumeThreadPoolSize() const; |
| |
| void consumeThreadPoolSize(int thread_pool_size); |
| |
| int32_t maxDeliveryAttempts() const override { |
| return max_delivery_attempts_; |
| } |
| |
| uint32_t consumeBatchSize() const override; |
| |
| void consumeBatchSize(uint32_t consume_batch_size); |
| |
| int32_t receiveBatchSize() const override { |
| return receive_batch_size_; |
| } |
| |
| std::shared_ptr<ConsumeMessageService> getConsumeMessageService() override; |
| |
| void ack(const MQMessageExt& msg, const std::function<void(const std::error_code&)>& callback) override; |
| |
| /** |
| * Negative acknowledge the given message; Refer to |
| * https://en.wikipedia.org/wiki/Acknowledgement_(data_networks) for |
| * background info. |
| * |
| * Current implementation is to change invisible time of the given message. |
| * |
| * @param message Message to negate on the broker side. |
| */ |
| void nack(const MQMessageExt& message, const std::function<void(const std::error_code&)>& callback) override; |
| |
| void forwardToDeadLetterQueue(const MQMessageExt& message, const std::function<void(bool)>& cb) override; |
| |
| void wrapAckMessageRequest(const MQMessageExt& msg, AckMessageRequest& request); |
| |
| // only for test |
| std::size_t getProcessQueueTableSize() LOCKS_EXCLUDED(process_queue_table_mtx_); |
| |
| void setCustomExecutor(const Executor& executor) { |
| custom_executor_ = executor; |
| } |
| |
| const Executor& customExecutor() const override { |
| return custom_executor_; |
| } |
| |
| void setThrottle(const std::string& topic, uint32_t threshold); |
| |
| MessageModel messageModel() const override { |
| return message_model_; |
| } |
| |
| void setMessageModel(MessageModel message_model) { |
| message_model_ = message_model; |
| } |
| |
| void offsetStore(std::unique_ptr<OffsetStore> offset_store) { |
| offset_store_ = std::move(offset_store); |
| } |
| |
| void updateOffset(const MQMessageQueue& message_queue, int64_t offset) override { |
| if (offset_store_) { |
| offset_store_->updateOffset(message_queue, offset); |
| } |
| } |
| |
| /** |
| * Max number of messages that may be cached per queue before applying |
| * back-pressure. |
| * @return |
| */ |
| uint32_t maxCachedMessageQuantity() const override { |
| return MixAll::DEFAULT_CACHED_MESSAGE_COUNT; |
| } |
| |
| /** |
| * Threshold of total cached message body size by queue before applying |
| * back-pressure. |
| * @return |
| */ |
| uint64_t maxCachedMessageMemory() const override { |
| return MixAll::DEFAULT_CACHED_MESSAGE_MEMORY; |
| } |
| |
| void iterateProcessQueue(const std::function<void(ProcessQueueSharedPtr)>& callback) override; |
| |
| MessageListener* messageListener() override { |
| return message_listener_; |
| } |
| |
| protected: |
| std::shared_ptr<ClientImpl> self() override { |
| return shared_from_this(); |
| } |
| |
| ClientResourceBundle resourceBundle() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_) override; |
| |
| void notifyClientTermination() override; |
| |
| private: |
| absl::flat_hash_map<std::string, FilterExpression> |
| topic_filter_expression_table_ GUARDED_BY(topic_filter_expression_table_mtx_); |
| mutable absl::Mutex topic_filter_expression_table_mtx_; |
| |
| /** |
| * Consume message thread pool size. |
| */ |
| uint32_t consume_thread_pool_size_{MixAll::DEFAULT_CONSUME_THREAD_POOL_SIZE}; |
| |
| MessageListener* message_listener_{nullptr}; |
| |
| std::shared_ptr<ConsumeMessageService> consume_message_service_; |
| uint32_t consume_batch_size_{MixAll::DEFAULT_CONSUME_MESSAGE_BATCH_SIZE}; |
| |
| int32_t receive_batch_size_{MixAll::DEFAULT_RECEIVE_MESSAGE_BATCH_SIZE}; |
| |
| std::uintptr_t scan_assignment_handle_{0}; |
| static const char* SCAN_ASSIGNMENT_TASK_NAME; |
| |
| absl::flat_hash_map<MQMessageQueue, ProcessQueueSharedPtr> process_queue_table_ GUARDED_BY(process_queue_table_mtx_); |
| absl::Mutex process_queue_table_mtx_; |
| |
| ConsumeFromWhere consume_from_where_{ConsumeFromWhere::CONSUME_FROM_LAST_OFFSET}; |
| Executor custom_executor_; |
| |
| absl::flat_hash_map<std::string /* Topic */, uint32_t /* Threshold */> |
| throttle_table_ GUARDED_BY(throttle_table_mtx_); |
| absl::Mutex throttle_table_mtx_; |
| |
| int32_t max_delivery_attempts_{MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS}; |
| |
| MessageModel message_model_{MessageModel::CLUSTERING}; |
| |
| mutable std::unique_ptr<OffsetStore> offset_store_; |
| |
| void fetchRoutes() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); |
| |
| friend class ConsumeMessageService; |
| friend class ConsumeFifoMessageService; |
| friend class ConsumeStandardMessageService; |
| }; |
| |
| ROCKETMQ_NAMESPACE_END |