| #pragma once |
| |
| #include <atomic> |
| #include <chrono> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <system_error> |
| |
| #include "RpcClient.h" |
| #include "absl/strings/string_view.h" |
| #include "apache/rocketmq/v1/definition.pb.h" |
| |
| #include "Client.h" |
| #include "ClientConfigImpl.h" |
| #include "ClientManager.h" |
| #include "ClientResourceBundle.h" |
| #include "InvocationContext.h" |
| #include "NameServerResolver.h" |
| #include "OtlpExporter.h" |
| #include "rocketmq/MQMessageExt.h" |
| #include "rocketmq/MessageListener.h" |
| #include "rocketmq/State.h" |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| class ClientImpl : public ClientConfigImpl, virtual public Client { |
| public: |
| explicit ClientImpl(absl::string_view group_name); |
| |
| ~ClientImpl() override = default; |
| |
| virtual void start(); |
| |
| virtual void shutdown(); |
| |
| void getRouteFor(const std::string& topic, const std::function<void(const std::error_code&, TopicRouteDataPtr)>& cb) |
| LOCKS_EXCLUDED(inflight_route_requests_mtx_, topic_route_table_mtx_); |
| |
| /** |
| * Gather collection of endpoints that are reachable from latest topic route |
| * table. |
| * |
| * @param endpoints |
| */ |
| void endpointsInUse(absl::flat_hash_set<std::string>& endpoints) override LOCKS_EXCLUDED(topic_route_table_mtx_); |
| |
| void heartbeat() override; |
| |
| bool active() override { |
| State state = state_.load(std::memory_order_relaxed); |
| return State::STARTING == state || State::STARTED == state; |
| } |
| |
| void onRemoteEndpointRemoval(const std::vector<std::string>& hosts) override LOCKS_EXCLUDED(isolated_endpoints_mtx_); |
| |
| void healthCheck() override LOCKS_EXCLUDED(isolated_endpoints_mtx_); |
| |
| void schedule(const std::string& task_name, const std::function<void(void)>& task, |
| std::chrono::milliseconds delay) override; |
| |
| void withNameServerResolver(std::shared_ptr<NameServerResolver> name_server_resolver) { |
| name_server_resolver_ = std::move(name_server_resolver); |
| } |
| |
| /** |
| * Expose for test purpose only. |
| */ |
| void state(State state) { |
| state_.store(state, std::memory_order_relaxed); |
| } |
| |
| protected: |
| ClientManagerPtr client_manager_; |
| std::shared_ptr<OtlpExporter> exporter_; |
| std::atomic<State> state_; |
| |
| absl::flat_hash_map<std::string, TopicRouteDataPtr> topic_route_table_ GUARDED_BY(topic_route_table_mtx_); |
| absl::Mutex topic_route_table_mtx_ ACQUIRED_AFTER(inflight_route_requests_mtx_); // protects topic_route_table_ |
| |
| absl::flat_hash_map<std::string, std::vector<std::function<void(const std::error_code&, const TopicRouteDataPtr&)>>> |
| inflight_route_requests_ GUARDED_BY(inflight_route_requests_mtx_); |
| absl::Mutex inflight_route_requests_mtx_ ACQUIRED_BEFORE(topic_route_table_mtx_); // Protects inflight_route_requests_ |
| static const char* UPDATE_ROUTE_TASK_NAME; |
| std::uint32_t route_update_handle_{0}; |
| |
| // Set Name Server Resolver |
| std::shared_ptr<NameServerResolver> name_server_resolver_; |
| |
| absl::flat_hash_map<std::string, absl::Time> multiplexing_requests_; |
| absl::Mutex multiplexing_requests_mtx_; |
| |
| absl::flat_hash_set<std::string> isolated_endpoints_ GUARDED_BY(isolated_endpoints_mtx_); |
| absl::Mutex isolated_endpoints_mtx_; |
| |
| void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_); |
| |
| /** |
| * Sub-class is supposed to inherit from std::enable_shared_from_this. |
| */ |
| virtual std::shared_ptr<ClientImpl> self() = 0; |
| |
| virtual void prepareHeartbeatData(HeartbeatRequest& request) = 0; |
| |
| virtual void verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message); |
| |
| /** |
| * @brief Execute transaction-state-checker to commit or roll-back the orphan transactional message. |
| * |
| * It is no-op by default and Producer-subclass is supposed to override it. |
| * |
| * @param transaction_id |
| * @param message |
| */ |
| virtual void resolveOrphanedTransactionalMessage(const std::string& transaction_id, const MQMessageExt& message) { |
| } |
| |
| /** |
| * Concrete publisher/subscriber client is expected to fill other |
| * type-specific resources. |
| */ |
| virtual ClientResourceBundle resourceBundle() { |
| ClientResourceBundle resource_bundle; |
| resource_bundle.client_id = clientId(); |
| resource_bundle.resource_namespace = resource_namespace_; |
| return resource_bundle; |
| } |
| |
| void setAccessPoint(rmq::Endpoints* endpoints); |
| |
| void notifyClientTermination() override; |
| |
| void notifyClientTermination(const NotifyClientTerminationRequest& request); |
| |
| /** |
| * @brief Return application developer provided message listener if this client is of PushConsumer type. |
| * |
| * By default, it returns nullptr such that error messages are generated and directed to server immediately. |
| * |
| * @return nullptr by default. |
| */ |
| virtual MessageListener* messageListener() { |
| return nullptr; |
| } |
| |
| private: |
| /** |
| * This is a low-level API that fetches route data from name server through |
| * gRPC unary request/response. Once request/response is completed, either |
| * timeout or response arrival in time, callback would get invoked. |
| * @param topic |
| * @param cb |
| */ |
| void fetchRouteFor(const std::string& topic, |
| const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb); |
| |
| /** |
| * Callback to execute once route data is fetched from name server. |
| * @param topic |
| * @param route |
| */ |
| void onTopicRouteReady(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route) |
| LOCKS_EXCLUDED(inflight_route_requests_mtx_); |
| |
| /** |
| * Update local cache for the topic. Note, route differences are logged in |
| * INFO level since route bears fundamental importance. |
| * |
| * @param topic |
| * @param route |
| */ |
| void updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route) |
| LOCKS_EXCLUDED(topic_route_table_mtx_); |
| |
| void pollCommand(const std::string& target); |
| |
| void onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx); |
| |
| void onHealthCheckResponse(const std::error_code& endpoint, const InvocationContext<HealthCheckResponse>* ctx) |
| LOCKS_EXCLUDED(isolated_endpoints_mtx_); |
| |
| void doVerify(std::string target, std::string command_id, MQMessageExt message); |
| }; |
| |
| ROCKETMQ_NAMESPACE_END |