blob: 5f1b27cae53c45030cf35e6141ce26751bd5ca1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <system_error>
#include <vector>
#include "Client.h"
#include "ClientManager.h"
#include "InsecureCertificateVerifier.h"
#include "ReceiveMessageCallback.h"
#include "RpcClientImpl.h"
#include "ThreadPoolImpl.h"
#include "TopicRouteData.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/synchronization/mutex.h"
#include "rocketmq/State.h"
ROCKETMQ_NAMESPACE_BEGIN
class ClientManagerImpl : virtual public ClientManager, public std::enable_shared_from_this<ClientManagerImpl> {
public:
/**
* @brief Construct a new Client Manager Impl object
* TODO: Make it protected such that instantiating it through ClientManagerFactory only, achieving Singleton
* effectively.
* @param resource_namespace Abstract resource namespace, in which this client manager lives.
*/
explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true);
~ClientManagerImpl() override;
void start() override;
void shutdown() override LOCKS_EXCLUDED(rpc_clients_mtx_);
std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) override;
/**
* Resolve route data from name server for the given topic.
*
* @param target_host Name server host address;
* @param metadata Request headers;
* @param request Query route entries request.
* @param timeout RPC timeout.
* @param cb Callback to execute once the request/response completes.
*/
void resolveRoute(const std::string& target_host,
const Metadata& metadata,
const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) override
LOCKS_EXCLUDED(rpc_clients_mtx_);
/**
* If inactive RPC clients refer to remote hosts that are absent from topic_route_table_, we need to purge them
* immediately.
*/
std::vector<std::string> cleanOfflineRpcClients() LOCKS_EXCLUDED(clients_mtx_, rpc_clients_mtx_);
bool send(const std::string& target_host,
const Metadata& metadata,
SendMessageRequest& request,
SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
/**
* Get a RpcClient according to the given target hosts, which follows scheme specified
* https://github.com/grpc/grpc/blob/master/doc/naming.md
*
* Note that a channel in gRPC is composted of one or more sub-channels. Every sub-channel represents a solid TCP
* connection. gRPC supports a number of configurable load-balancing policy with "pick-first" as the default option.
* Requests are distributed
* @param target_host
* @param need_heartbeat
* @return
*/
RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true) override
LOCKS_EXCLUDED(rpc_clients_mtx_);
static SendResult processSendResponse(const rmq::MessageQueue& message_queue,
const SendMessageResponse& response,
std::error_code& ec);
// only for test
void addRpcClient(const std::string& target_host, const RpcClientSharedPtr& client) LOCKS_EXCLUDED(rpc_clients_mtx_);
// Test purpose only
void cleanRpcClients() LOCKS_EXCLUDED(rpc_clients_mtx_);
void addClientObserver(std::weak_ptr<Client> client) override;
void queryAssignment(const std::string& target,
const Metadata& metadata,
const QueryAssignmentRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const QueryAssignmentResponse&)>& cb) override;
void receiveMessage(const std::string& target,
const Metadata& metadata,
const ReceiveMessageRequest& request,
std::chrono::milliseconds timeout,
ReceiveMessageCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
/**
* Translate protobuf message struct to domain model.
*
* @param item
* @return true if the translation succeeded; false if something wrong happens, including checksum verification, etc.
*/
MessageConstSharedPtr wrapMessage(const rmq::Message& item) override;
SchedulerSharedPtr getScheduler() override;
/**
* Ack message asynchronously.
* @param target_host Target broker host address.
* @param request Ack message request.
*/
void ack(const std::string& target_host,
const Metadata& metadata,
const AckMessageRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&)>& cb) override;
void changeInvisibleDuration(const std::string& target_host,
const Metadata& metadata,
const ChangeInvisibleDurationRequest&,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&)>&) override;
void forwardMessageToDeadLetterQueue(const std::string& target_host,
const Metadata& metadata,
const ForwardMessageToDeadLetterQueueRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&)>& cb) override;
/**
* End a transaction asynchronously.
*
* Callback conforms the following method signature:
* void callable(bool rpc_ok, const EndTransactionResponse& response)
*
* if rpc_ok is false, the end transaction request may never reach the server and consequently no need to inspect the
* response. If rpc_ok is true, response should be further inspected to determine business tier code and logic.
* @param target_host
* @param metadata
* @param request
* @param timeout
* @param cb
*/
void endTransaction(const std::string& target_host,
const Metadata& metadata,
const EndTransactionRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) override;
std::error_code notifyClientTermination(const std::string& target_host,
const Metadata& metadata,
const NotifyClientTerminationRequest& request,
std::chrono::milliseconds timeout) override;
void trace(bool trace) {
trace_ = trace;
}
void heartbeat(const std::string& target_host,
const Metadata& metadata,
const HeartbeatRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) override;
State state() const override;
void submit(std::function<void()> task) override;
private:
void doHeartbeat();
SchedulerSharedPtr scheduler_;
static const char* HEARTBEAT_TASK_NAME;
static const char* STATS_TASK_NAME;
std::string resource_namespace_;
std::atomic<State> state_;
std::vector<std::weak_ptr<Client>> clients_ GUARDED_BY(clients_mtx_);
absl::Mutex clients_mtx_;
absl::flat_hash_map<std::string, std::shared_ptr<RpcClient>> rpc_clients_ GUARDED_BY(rpc_clients_mtx_);
absl::Mutex rpc_clients_mtx_; // protects rpc_clients_
std::uint32_t heartbeat_task_id_{0};
std::unique_ptr<ThreadPoolImpl> callback_thread_pool_;
absl::flat_hash_set<std::string> exporter_endpoint_set_ GUARDED_BY(exporter_endpoint_set_mtx_);
absl::Mutex exporter_endpoint_set_mtx_;
/**
* Tenant-id. Each user shall have one unique identifier.
*/
std::string tenant_id_;
std::string service_name_{"RocketMQ"};
/**
* TLS configuration
*/
std::shared_ptr<grpc::experimental::CertificateVerifier> certificate_verifier_;
grpc::experimental::TlsChannelCredentialsOptions tls_channel_credential_options_;
std::shared_ptr<grpc::ChannelCredentials> channel_credential_;
grpc::ChannelArguments channel_arguments_;
bool trace_{false};
bool with_ssl_;
};
ROCKETMQ_NAMESPACE_END