blob: 60a4049878c9aa41f8db0f5367f417e96c3193a0 [file] [log] [blame]
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <iterator>
#include <memory>
#include <string>
#include <system_error>
#include <utility>
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "apache/rocketmq/v1/definition.pb.h"
#include "ClientImpl.h"
#include "ClientManagerFactory.h"
#include "HttpClientImpl.h"
#include "InvocationContext.h"
#include "LoggerImpl.h"
#include "MessageAccessor.h"
#include "Signature.h"
#include "rocketmq/MQMessageExt.h"
ROCKETMQ_NAMESPACE_BEGIN
ClientImpl::ClientImpl(absl::string_view group_name) : ClientConfigImpl(group_name), state_(State::CREATED) {}
void ClientImpl::start() {
State expected = CREATED;
if (!state_.compare_exchange_strong(expected, State::STARTING)) {
SPDLOG_WARN("Unexpected state: {}", state_.load(std::memory_order_relaxed));
return;
}
if (!name_server_resolver_) {
SPDLOG_ERROR("No name server resolver is configured.");
abort();
}
name_server_resolver_->start();
client_manager_ = ClientManagerFactory::getInstance().getClientManager(*this);
client_manager_->start();
exporter_ = std::make_shared<OtlpExporter>(client_manager_, this);
exporter_->start();
std::weak_ptr<ClientImpl> ptr(self());
auto route_update_functor = [ptr]() {
std::shared_ptr<ClientImpl> base = ptr.lock();
if (base) {
base->updateRouteInfo();
}
};
route_update_handle_ = client_manager_->getScheduler().schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
std::chrono::seconds(10), std::chrono::seconds(30));
state_.store(State::STARTED);
}
void ClientImpl::shutdown() {
state_.store(State::STOPPING, std::memory_order_relaxed);
name_server_resolver_->shutdown();
if (route_update_handle_) {
client_manager_->getScheduler().cancel(route_update_handle_);
}
notifyClientTermination();
client_manager_.reset();
}
const char* ClientImpl::UPDATE_ROUTE_TASK_NAME = "route_updater";
void ClientImpl::endpointsInUse(absl::flat_hash_set<std::string>& endpoints) {
absl::MutexLock lk(&topic_route_table_mtx_);
for (const auto& item : topic_route_table_) {
for (const auto& partition : item.second->partitions()) {
std::string endpoint = partition.asMessageQueue().serviceAddress();
if (!endpoints.contains(endpoint)) {
endpoints.emplace(std::move(endpoint));
}
}
}
}
void ClientImpl::getRouteFor(const std::string& topic,
const std::function<void(const std::error_code&, TopicRouteDataPtr)>& cb) {
TopicRouteDataPtr route = nullptr;
{
absl::MutexLock lock(&topic_route_table_mtx_);
if (topic_route_table_.contains(topic)) {
route = topic_route_table_.at(topic);
}
}
if (route) {
std::error_code ec;
cb(ec, route);
return;
}
bool query_backend = true;
{
absl::MutexLock lk(&inflight_route_requests_mtx_);
{
absl::MutexLock route_table_lock(&topic_route_table_mtx_);
if (topic_route_table_.contains(topic)) {
route = topic_route_table_.at(topic);
query_backend = false;
}
}
if (query_backend) {
if (inflight_route_requests_.contains(topic)) {
inflight_route_requests_.at(topic).emplace_back(cb);
SPDLOG_DEBUG("Would reuse prior route request for topic={}", topic);
return;
} else {
std::vector<std::function<void(const std::error_code&, const TopicRouteDataPtr&)>> inflight{cb};
inflight_route_requests_.insert({topic, inflight});
SPDLOG_INFO("Create inflight route query cache for topic={}", topic);
}
}
}
if (!query_backend && route) {
std::error_code ec;
cb(ec, route);
} else {
fetchRouteFor(topic,
std::bind(&ClientImpl::onTopicRouteReady, this, topic, std::placeholders::_1, std::placeholders::_2));
}
}
void ClientImpl::setAccessPoint(rmq::Endpoints* endpoints) {
std::vector<std::pair<std::string, std::uint16_t>> pairs;
{
std::vector<std::string> name_server_list = name_server_resolver_->resolve();
for (const auto& name_server_item : name_server_list) {
std::string::size_type pos = name_server_item.rfind(':');
if (std::string::npos == pos) {
continue;
}
std::string host(name_server_item.substr(0, pos));
std::string port(name_server_item.substr(pos + 1));
pairs.emplace_back(std::make_pair(host, std::stoi(port)));
}
}
if (!pairs.empty()) {
for (const auto& host_port : pairs) {
auto address = new rmq::Address();
address->set_port(host_port.second);
address->set_host(host_port.first);
endpoints->mutable_addresses()->AddAllocated(address);
}
if (MixAll::isIPv4(pairs.begin()->first)) {
endpoints->set_scheme(rmq::AddressScheme::IPv4);
} else if (absl::StrContains(pairs.begin()->first, ':')) {
endpoints->set_scheme(rmq::AddressScheme::IPv6);
} else {
endpoints->set_scheme(rmq::AddressScheme::DOMAIN_NAME);
}
}
}
void ClientImpl::fetchRouteFor(const std::string& topic,
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) {
std::string name_server = name_server_resolver_->current();
if (name_server.empty()) {
SPDLOG_WARN("No name server available");
return;
}
auto callback = [this, topic, name_server, cb](const std::error_code& ec, const TopicRouteDataPtr& route) {
if (ec) {
SPDLOG_WARN("Failed to resolve route for topic={} from {}", topic, name_server);
std::string name_server_changed = name_server_resolver_->next();
if (!name_server_changed.empty()) {
SPDLOG_INFO("Change current name server from {} to {}", name_server, name_server_changed);
}
cb(ec, nullptr);
return;
}
SPDLOG_DEBUG("Apply callback of fetchRouteFor({}) since a valid route is fetched", topic);
cb(ec, route);
};
QueryRouteRequest request;
request.mutable_topic()->set_resource_namespace(resource_namespace_);
request.mutable_topic()->set_name(topic);
auto endpoints = request.mutable_endpoints();
setAccessPoint(endpoints);
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
client_manager_->resolveRoute(name_server, metadata, request, absl::ToChronoMilliseconds(io_timeout_), callback);
}
void ClientImpl::updateRouteInfo() {
if (State::STARTED != state_.load(std::memory_order_relaxed) &&
State::STARTING != state_.load(std::memory_order_relaxed)) {
SPDLOG_WARN("Unexpected client instance state={}.", state_.load(std::memory_order_relaxed));
return;
}
std::vector<std::string> topics;
{
absl::MutexLock lock(&topic_route_table_mtx_);
for (const auto& entry : topic_route_table_) {
topics.push_back(entry.first);
}
}
if (!topics.empty()) {
for (const auto& topic : topics) {
fetchRouteFor(
topic, std::bind(&ClientImpl::updateRouteCache, this, topic, std::placeholders::_1, std::placeholders::_2));
}
}
SPDLOG_DEBUG("Topic route info updated");
}
void ClientImpl::heartbeat() {
absl::flat_hash_set<std::string> hosts;
endpointsInUse(hosts);
if (hosts.empty()) {
SPDLOG_WARN("No hosts to send heartbeat to at present");
return;
}
HeartbeatRequest request;
prepareHeartbeatData(request);
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
for (const auto& target : hosts) {
auto callback = [target](const std::error_code& ec, const HeartbeatResponse& response) {
if (ec) {
SPDLOG_WARN("Failed to heartbeat against {}. Cause: {}", target, ec.message());
return;
}
SPDLOG_DEBUG("Heartbeat to {} OK", target);
};
client_manager_->heartbeat(target, metadata, request, absl::ToChronoMilliseconds(io_timeout_), callback);
}
}
void ClientImpl::onTopicRouteReady(const std::string& topic, const std::error_code& ec,
const TopicRouteDataPtr& route) {
if (route) {
SPDLOG_DEBUG("Received route data for topic={}", topic);
}
updateRouteCache(topic, ec, route);
// Take all pending callbacks
std::vector<std::function<void(const std::error_code&, const TopicRouteDataPtr&)>> pending_requests;
{
absl::MutexLock lk(&inflight_route_requests_mtx_);
assert(inflight_route_requests_.contains(topic));
auto& inflight_requests = inflight_route_requests_.at(topic);
pending_requests.insert(pending_requests.end(), inflight_requests.begin(), inflight_requests.end());
inflight_route_requests_.erase(topic);
}
SPDLOG_DEBUG("Apply cached callbacks with acquired route data for topic={}", topic);
for (const auto& cb : pending_requests) {
cb(ec, route);
}
}
void ClientImpl::updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route) {
if (ec || !route || route->partitions().empty()) {
SPDLOG_WARN("Yuck! route for {} is invalid. Cause: {}", topic, ec.message());
return;
}
{
absl::MutexLock lk(&topic_route_table_mtx_);
if (!topic_route_table_.contains(topic)) {
topic_route_table_.insert({topic, route});
SPDLOG_INFO("TopicRouteData for topic={} has changed. NONE --> {}", topic, route->debugString());
} else {
TopicRouteDataPtr cached = topic_route_table_.at(topic);
if (*cached != *route) {
topic_route_table_.insert_or_assign(topic, route);
std::string previous = cached->debugString();
SPDLOG_INFO("TopicRouteData for topic={} has changed. {} --> {}", topic, cached->debugString(),
route->debugString());
}
}
}
}
void ClientImpl::multiplexing(const std::string& target, const MultiplexingRequest& request) {
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
client_manager_->multiplexingCall(target, metadata, request, absl::ToChronoMilliseconds(long_polling_timeout_),
std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
}
void ClientImpl::onMultiplexingResponse(const InvocationContext<MultiplexingResponse>* ctx) {
if (!ctx->status.ok()) {
std::string remote_address = ctx->remote_address;
auto multiplexingLater = [this, remote_address]() {
MultiplexingRequest request;
fillGenericPollingRequest(request);
multiplexing(remote_address, request);
};
static std::string task_name = "Initiate multiplex request later";
client_manager_->getScheduler().schedule(multiplexingLater, task_name, std::chrono::seconds(3),
std::chrono::seconds(0));
return;
}
switch (ctx->response.type_case()) {
case MultiplexingResponse::TypeCase::kPrintThreadStackRequest: {
MultiplexingRequest request;
request.mutable_print_thread_stack_response()->mutable_common()->mutable_status()->set_code(
google::rpc::Code::UNIMPLEMENTED);
request.mutable_print_thread_stack_response()->set_stack_trace(
"Print thread stack trace is not supported by C++ SDK");
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
client_manager_->multiplexingCall(ctx->remote_address, metadata, request, absl::ToChronoMilliseconds(io_timeout_),
std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
break;
}
case MultiplexingResponse::TypeCase::kVerifyMessageConsumptionRequest: {
auto data = ctx->response.verify_message_consumption_request().message();
MQMessageExt message;
MultiplexingRequest request;
if (!client_manager_->wrapMessage(data, message)) {
SPDLOG_WARN("Message to verify consumption is corrupted");
request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_code(
google::rpc::Code::INVALID_ARGUMENT);
request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_message(
"Message to verify is corrupted");
multiplexing(ctx->remote_address, request);
return;
}
std::string&& result = verifyMessageConsumption(message);
request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_code(
google::rpc::Code::OK);
request.mutable_verify_message_consumption_response()->mutable_common()->mutable_status()->set_message(result);
multiplexing(ctx->remote_address, request);
break;
}
case MultiplexingResponse::TypeCase::kResolveOrphanedTransactionRequest: {
auto orphan = ctx->response.resolve_orphaned_transaction_request().orphaned_transactional_message();
MQMessageExt message;
if (client_manager_->wrapMessage(orphan, message)) {
MessageAccessor::setTargetEndpoint(message, ctx->remote_address);
const std::string& transaction_id = ctx->response.resolve_orphaned_transaction_request().transaction_id();
resolveOrphanedTransactionalMessage(transaction_id, message);
} else {
SPDLOG_WARN("Failed to resolve orphaned transactional message, potentially caused by message-body checksum "
"verification failure.");
}
MultiplexingRequest request;
fillGenericPollingRequest(request);
multiplexing(ctx->remote_address, request);
break;
}
case MultiplexingResponse::TypeCase::kPollingResponse: {
MultiplexingRequest request;
fillGenericPollingRequest(request);
multiplexing(ctx->remote_address, request);
break;
}
default: {
SPDLOG_WARN("Unsupported multiplex type");
MultiplexingRequest request;
fillGenericPollingRequest(request);
multiplexing(ctx->remote_address, request);
break;
}
}
}
void ClientImpl::onRemoteEndpointRemoval(const std::vector<std::string>& hosts) {
absl::MutexLock lk(&isolated_endpoints_mtx_);
for (auto it = isolated_endpoints_.begin(); it != isolated_endpoints_.end();) {
if (hosts.end() != std::find_if(hosts.begin(), hosts.end(), [&](const std::string& item) { return *it == item; })) {
SPDLOG_INFO("Drop isolated-endoint[{}] as it has been removed from route table", *it);
isolated_endpoints_.erase(it++);
} else {
it++;
}
}
}
void ClientImpl::healthCheck() {
std::vector<std::string> endpoints;
{
absl::MutexLock lk(&isolated_endpoints_mtx_);
for (const auto& item : isolated_endpoints_) {
endpoints.push_back(item);
}
}
std::weak_ptr<ClientImpl> base(self());
auto callback = [base](const std::error_code& ec, const InvocationContext<HealthCheckResponse>* invocation_context) {
std::shared_ptr<ClientImpl> ptr = base.lock();
if (!ptr) {
SPDLOG_INFO("BaseImpl has been destructed");
return;
}
ptr->onHealthCheckResponse(ec, invocation_context);
};
for (const auto& endpoint : endpoints) {
HealthCheckRequest request;
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
client_manager_->healthCheck(endpoint, metadata, request, absl::ToChronoMilliseconds(io_timeout_), callback);
}
}
void ClientImpl::schedule(const std::string& task_name, const std::function<void()>& task,
std::chrono::milliseconds delay) {
client_manager_->getScheduler().schedule(task, task_name, delay, std::chrono::milliseconds(0));
}
void ClientImpl::onHealthCheckResponse(const std::error_code& ec, const InvocationContext<HealthCheckResponse>* ctx) {
if (ec) {
SPDLOG_WARN("Health check to server[host={}] failed. Cause: {}", ec.message());
return;
}
SPDLOG_INFO("Health check to server[host={}] passed. Remove it from isolated endpoint pool", ctx->remote_address);
{
absl::MutexLock lk(&isolated_endpoints_mtx_);
isolated_endpoints_.erase(ctx->remote_address);
}
}
void ClientImpl::fillGenericPollingRequest(MultiplexingRequest& request) {
auto&& resource_bundle = resourceBundle();
auto polling_request = request.mutable_polling_request();
polling_request->set_client_id(clientId());
switch (resource_bundle.group_type) {
case GroupType::PUBLISHER:
polling_request->mutable_producer_group()->set_resource_namespace(resource_namespace_);
polling_request->mutable_producer_group()->set_name(group_name_);
break;
case GroupType::SUBSCRIBER:
polling_request->mutable_consumer_group()->set_resource_namespace(resource_namespace_);
polling_request->mutable_consumer_group()->set_name(group_name_);
break;
}
auto topics = polling_request->mutable_topics();
for (const auto& item : resource_bundle.topics) {
auto topic = new rmq::Resource();
topic->set_resource_namespace(resource_namespace_);
topic->set_name(item);
topics->AddAllocated(topic);
}
}
void ClientImpl::notifyClientTermination() {
absl::flat_hash_set<std::string> endpoints;
endpointsInUse(endpoints);
Metadata metadata;
Signature::sign(this, metadata);
NotifyClientTerminationRequest request;
request.mutable_group()->set_resource_namespace(resource_namespace_);
request.mutable_group()->set_name(group_name_);
request.set_client_id(clientId());
for (const auto& endpoint : endpoints) {
client_manager_->notifyClientTermination(endpoint, metadata, request, absl::ToChronoMilliseconds(io_timeout_));
}
}
ROCKETMQ_NAMESPACE_END