blob: 77ec31caf8d3485d8ee258a65a87db2c3c7992d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <future>
#include <string>
#include <system_error>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "Client.h"
#include "ClientManager.h"
#include "HeartbeatDataCallback.h"
#include "Histogram.h"
#include "InvocationContext.h"
#include "OrphanTransactionCallback.h"
#include "ReceiveMessageCallback.h"
#include "RpcClient.h"
#include "RpcClientImpl.h"
#include "SchedulerImpl.h"
#include "SendMessageContext.h"
#include "ThreadPoolImpl.h"
#include "TopAddressing.h"
#include "TopicRouteChangeCallback.h"
#include "TopicRouteData.h"
#include "rocketmq/AsyncCallback.h"
#include "rocketmq/State.h"
ROCKETMQ_NAMESPACE_BEGIN
class ClientManagerImpl : virtual public ClientManager, public std::enable_shared_from_this<ClientManagerImpl> {
public:
/**
* @brief Construct a new Client Manager Impl object
* TODO: Make it protected such that instantiating it through ClientManagerFactory only, achieving Singleton
* effectively.
* @param resource_namespace Abstract resource namespace, in which this client manager lives.
*/
explicit ClientManagerImpl(std::string resource_namespace);
~ClientManagerImpl() override;
void start() override;
void shutdown() override LOCKS_EXCLUDED(rpc_clients_mtx_);
static void assignLabels(Histogram& histogram);
std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) override;
/**
* Resolve route data from name server for the given topic.
*
* @param target_host Name server host address;
* @param metadata Request headers;
* @param request Query route entries request.
* @param timeout RPC timeout.
* @param cb Callback to execute once the request/response completes.
*/
void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) override
LOCKS_EXCLUDED(rpc_clients_mtx_);
void doHealthCheck() LOCKS_EXCLUDED(clients_mtx_);
/**
* If inactive RPC clients refer to remote hosts that are absent from topic_route_table_, we need to purge them
* immediately.
*/
std::vector<std::string> cleanOfflineRpcClients() LOCKS_EXCLUDED(clients_mtx_, rpc_clients_mtx_);
/**
* Execute health-check on behalf of the client.
*/
void healthCheck(const std::string& target_host, const Metadata& metadata, const HealthCheckRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const InvocationContext<HealthCheckResponse>*)>& cb)
override LOCKS_EXCLUDED(rpc_clients_mtx_);
bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request,
SendCallback* cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
/**
* Get a RpcClient according to the given target hosts, which follows scheme specified
* https://github.com/grpc/grpc/blob/master/doc/naming.md
*
* Note that a channel in gRPC is composted of one or more sub-channels. Every sub-channel represents a solid TCP
* connection. gRPC supports a number of configurable load-balancing policy with "pick-first" as the default option.
* Requests are distributed
* @param target_host
* @param need_heartbeat
* @return
*/
RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true)
LOCKS_EXCLUDED(rpc_clients_mtx_);
static SendResult processSendResponse(const MQMessageQueue& message_queue, const SendMessageResponse& response);
// only for test
void addRpcClient(const std::string& target_host, const RpcClientSharedPtr& client) LOCKS_EXCLUDED(rpc_clients_mtx_);
// Test purpose only
void cleanRpcClients() LOCKS_EXCLUDED(rpc_clients_mtx_);
void addClientObserver(std::weak_ptr<Client> client) override;
void queryAssignment(const std::string& target, const Metadata& metadata, const QueryAssignmentRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const QueryAssignmentResponse&)>& cb) override;
void receiveMessage(const std::string& target, const Metadata& metadata, const ReceiveMessageRequest& request,
std::chrono::milliseconds timeout, const std::shared_ptr<ReceiveMessageCallback>& cb) override
LOCKS_EXCLUDED(rpc_clients_mtx_);
/**
* Translate protobuf message struct to domain model.
*
* @param item
* @param message_ext
* @return true if the translation succeeded; false if something wrong happens, including checksum verification, etc.
*/
bool wrapMessage(const rmq::Message& item, MQMessageExt& message_ext) override;
Scheduler& getScheduler() override;
/**
* Ack message asynchronously.
* @param target_host Target broker host address.
* @param request Ack message request.
*/
void ack(const std::string& target_host, const Metadata& metadata, const AckMessageRequest& request,
std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) override;
void nack(const std::string& target_host, const Metadata& metadata, const NackMessageRequest& request,
std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& callback) override;
void forwardMessageToDeadLetterQueue(
const std::string& target_host, const Metadata& metadata, const ForwardMessageToDeadLetterQueueRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const InvocationContext<ForwardMessageToDeadLetterQueueResponse>*)>& cb) override;
/**
* End a transaction asynchronously.
*
* Callback conforms the following method signature:
* void callable(bool rpc_ok, const EndTransactionResponse& response)
*
* if rpc_ok is false, the end transaction request may never reach the server and consequently no need to inspect the
* response. If rpc_ok is true, response should be further inspected to determine business tier code and logic.
* @param target_host
* @param metadata
* @param request
* @param timeout
* @param cb
*/
void endTransaction(const std::string& target_host, const Metadata& metadata, const EndTransactionRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) override;
void pollCommand(const std::string& target, const Metadata& metadata, const PollCommandRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const InvocationContext<PollCommandResponse>*)>& cb) override;
void queryOffset(const std::string& target_host, const Metadata& metadata, const QueryOffsetRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const QueryOffsetResponse&)>& cb) override;
void pullMessage(const std::string& target_host, const Metadata& metadata, const PullMessageRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const ReceiveMessageResult&)>& cb) override;
std::error_code notifyClientTermination(const std::string& target_host, const Metadata& metadata,
const NotifyClientTerminationRequest& request,
std::chrono::milliseconds timeout) override;
std::error_code reportThreadStackTrace(const std::string& target_host, const Metadata& metadata,
const ReportThreadStackTraceRequest& request,
std::chrono::milliseconds timeout) override;
std::error_code reportMessageConsumptionResult(const std::string& target_host, const Metadata& metadata,
const ReportMessageConsumptionResultRequest& request,
std::chrono::milliseconds timeout) override;
void trace(bool trace) {
trace_ = trace;
}
void heartbeat(const std::string& target_host, const Metadata& metadata, const HeartbeatRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) override;
State state() const override;
void submit(std::function<void()> task) override;
private:
void doHeartbeat();
void pollCompletionQueue();
void logStats();
SchedulerImpl scheduler_;
static const char* HEARTBEAT_TASK_NAME;
static const char* STATS_TASK_NAME;
static const char* HEALTH_CHECK_TASK_NAME;
std::string resource_namespace_;
std::atomic<State> state_;
std::vector<std::weak_ptr<Client>> clients_ GUARDED_BY(clients_mtx_);
absl::Mutex clients_mtx_;
absl::flat_hash_map<std::string, std::shared_ptr<RpcClient>> rpc_clients_ GUARDED_BY(rpc_clients_mtx_);
absl::Mutex rpc_clients_mtx_; // protects rpc_clients_
std::uint32_t heartbeat_task_id_{0};
std::uint32_t health_check_task_id_{0};
std::uint32_t stats_task_id_{0};
std::shared_ptr<CompletionQueue> completion_queue_;
std::unique_ptr<ThreadPoolImpl> callback_thread_pool_;
std::thread completion_queue_thread_;
Histogram latency_histogram_;
absl::flat_hash_set<std::string> exporter_endpoint_set_ GUARDED_BY(exporter_endpoint_set_mtx_);
absl::Mutex exporter_endpoint_set_mtx_;
/**
* Tenant-id. Each user shall have one unique identifier.
*/
std::string tenant_id_;
std::string service_name_{"MQ"};
/**
* TLS configuration
*/
std::shared_ptr<grpc::experimental::TlsServerAuthorizationCheckConfig> server_authorization_check_config_;
std::shared_ptr<grpc::experimental::CertificateProviderInterface> certificate_provider_;
grpc::experimental::TlsChannelCredentialsOptions tls_channel_credential_options_;
std::shared_ptr<grpc::ChannelCredentials> channel_credential_;
grpc::ChannelArguments channel_arguments_;
bool trace_{false};
};
ROCKETMQ_NAMESPACE_END