blob: f9981bc150abfbcea0b7316eef30f4f6bc2066b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DynamicNameServerResolver.h"
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include "absl/strings/str_join.h"
#include "LoggerImpl.h"
#include "SchedulerImpl.h"
ROCKETMQ_NAMESPACE_BEGIN
DynamicNameServerResolver::DynamicNameServerResolver(absl::string_view endpoint,
std::chrono::milliseconds refresh_interval)
: endpoint_(endpoint.data(), endpoint.length()), scheduler_(std::make_shared<SchedulerImpl>(1)),
refresh_interval_(refresh_interval) {
absl::string_view remains;
if (absl::StartsWith(endpoint_, "https://")) {
ssl_ = true;
remains = absl::StripPrefix(endpoint_, "https://");
} else {
remains = absl::StripPrefix(endpoint_, "http://");
}
std::int32_t port = 80;
if (ssl_) {
port = 443;
}
absl::string_view host;
if (absl::StrContains(remains, ':')) {
std::vector<absl::string_view> segments = absl::StrSplit(remains, ':');
host = segments[0];
remains = absl::StripPrefix(remains, host);
remains = absl::StripPrefix(remains, ":");
segments = absl::StrSplit(remains, '/');
if (!absl::SimpleAtoi(segments[0], &port)) {
SPDLOG_WARN("Failed to parse port of name-server-list discovery service endpoint");
abort();
}
remains = absl::StripPrefix(remains, segments[0]);
} else {
std::vector<absl::string_view> segments = absl::StrSplit(remains, '/');
host = segments[0];
remains = absl::StripPrefix(remains, host);
}
top_addressing_ = absl::make_unique<TopAddressing>(std::string(host.data(), host.length()), port,
std::string(remains.data(), remains.length()));
}
std::string DynamicNameServerResolver::resolve() {
bool fetch_immediately = false;
{
absl::MutexLock lk(&name_server_list_mtx_);
if (name_server_list_.empty()) {
fetch_immediately = true;
}
}
if (fetch_immediately) {
fetch();
}
{
absl::MutexLock lk(&name_server_list_mtx_);
return naming_scheme_.buildAddress(name_server_list_);
}
}
void DynamicNameServerResolver::fetch() {
std::weak_ptr<DynamicNameServerResolver> ptr(shared_from_this());
auto callback = [ptr](bool success, const std::vector<std::string>& name_server_list) {
if (success && !name_server_list.empty()) {
std::shared_ptr<DynamicNameServerResolver> resolver = ptr.lock();
if (resolver) {
resolver->onNameServerListFetched(name_server_list);
}
}
};
top_addressing_->fetchNameServerAddresses(callback);
}
void DynamicNameServerResolver::onNameServerListFetched(const std::vector<std::string>& name_server_list) {
if (!name_server_list.empty()) {
absl::MutexLock lk(&name_server_list_mtx_);
if (name_server_list_ != name_server_list) {
SPDLOG_INFO("Name server list changed. {} --> {}", absl::StrJoin(name_server_list_, ";"),
absl::StrJoin(name_server_list, ";"));
name_server_list_ = name_server_list;
}
}
}
void DynamicNameServerResolver::injectHttpClient(std::unique_ptr<HttpClient> http_client) {
top_addressing_->injectHttpClient(std::move(http_client));
}
void DynamicNameServerResolver::start() {
scheduler_->start();
scheduler_->schedule(std::bind(&DynamicNameServerResolver::fetch, this), "DynamicNameServerResolver",
std::chrono::milliseconds(0), refresh_interval_);
}
void DynamicNameServerResolver::shutdown() {
scheduler_->shutdown();
}
ROCKETMQ_NAMESPACE_END