blob: 283d5bad23b76b5b109e004afeba1fd16d4252c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cluster_connection.h"
#include <ignite/network/codec.h>
#include <ignite/network/codec_data_filter.h>
#include <ignite/network/length_prefix_codec.h>
#include <ignite/network/network.h>
#include <ignite/protocol/writer.h>
#include <iterator>
namespace ignite::detail {
cluster_connection::cluster_connection(ignite_client_configuration configuration)
: m_configuration(std::move(configuration))
, m_pool()
, m_logger(m_configuration.get_logger())
, m_generator(std::random_device()()) {
}
void cluster_connection::start_async(std::function<void(ignite_result<void>)> callback) {
using namespace network;
if (m_pool)
throw ignite_error("Client is already started");
std::vector<tcp_range> addrs;
addrs.reserve(m_configuration.get_endpoints().size());
for (const auto &str_addr : m_configuration.get_endpoints()) {
std::optional<tcp_range> ep = tcp_range::parse(str_addr, DEFAULT_TCP_PORT);
if (!ep)
throw ignite_error("Can not parse address range: " + str_addr);
addrs.push_back(std::move(ep.value()));
}
data_filters filters;
std::shared_ptr<factory<codec>> codec_factory = std::make_shared<length_prefix_codec_factory>();
std::shared_ptr<codec_data_filter> codec_filter(new network::codec_data_filter(codec_factory));
filters.push_back(codec_filter);
m_pool = network::make_async_client_pool(filters);
m_pool->set_handler(shared_from_this());
m_on_initial_connect = std::move(callback);
m_pool->start(std::move(addrs), m_configuration.get_connection_limit());
}
void cluster_connection::stop() {
auto pool = m_pool;
if (pool)
pool->stop();
}
void cluster_connection::on_connection_success(const network::end_point &addr, uint64_t id) {
m_logger->log_info("Established connection with remote host " + addr.to_string());
m_logger->log_debug("Connection ID: " + std::to_string(id));
auto connection = node_connection::make_new(id, m_pool, m_logger);
{
[[maybe_unused]] std::unique_lock<std::recursive_mutex> lock(m_connections_mutex);
auto [_it, was_new] = m_connections.insert_or_assign(id, connection);
if (!was_new)
m_logger->log_error(
"Unknown error: connecting is already in progress. Connection ID: " + std::to_string(id));
}
try {
bool res = connection->handshake();
if (!res) {
m_logger->log_warning("Failed to send handshake request: Connection already closed.");
remove_client(id);
return;
}
m_logger->log_debug("Handshake sent successfully");
} catch (const ignite_error &err) {
m_logger->log_warning("Failed to send handshake request: " + err.what_str());
remove_client(id);
}
}
void cluster_connection::on_connection_error(const network::end_point &addr, ignite_error err) {
m_logger->log_warning(
"Failed to establish connection with remote host " + addr.to_string() + ", reason: " + err.what());
if (err.get_status_code() == status_code::OS)
initial_connect_result(std::move(err));
}
void cluster_connection::on_connection_closed(uint64_t id, std::optional<ignite_error> err) {
m_logger->log_debug("Closed Connection ID " + std::to_string(id) + ", error=" + (err ? err->what() : "none"));
remove_client(id);
}
void cluster_connection::on_message_received(uint64_t id, bytes_view msg) {
if (m_logger->is_debug_enabled())
m_logger->log_debug("Message on Connection ID " + std::to_string(id) + ", size: " + std::to_string(msg.size()));
std::shared_ptr<node_connection> connection = find_client(id);
if (!connection)
return;
if (connection->is_handshake_complete()) {
connection->process_message(msg);
return;
}
auto res = connection->process_handshake_rsp(msg);
if (res.has_error()) {
initial_connect_result(std::move(res));
remove_client(connection->id());
return;
}
auto &context = connection->get_protocol_context();
initial_connect_result(context);
if (context.get_cluster_id() != m_cluster_id) {
std::stringstream message;
message << "Node from unknown cluster: current_cluster_id=" << m_cluster_id
<< ", node_cluster_id=" << context.get_cluster_id();
m_logger->log_warning(message.str());
remove_client(connection->id());
}
}
std::shared_ptr<node_connection> cluster_connection::find_client(uint64_t id) {
[[maybe_unused]] std::unique_lock<std::recursive_mutex> lock(m_connections_mutex);
auto it = m_connections.find(id);
if (it != m_connections.end())
return it->second;
return {};
}
void cluster_connection::on_message_sent(uint64_t id) {
if (m_logger->is_debug_enabled())
m_logger->log_debug("Message sent successfully on Connection ID " + std::to_string(id));
}
void cluster_connection::remove_client(uint64_t id) {
[[maybe_unused]] std::unique_lock<std::recursive_mutex> lock(m_connections_mutex);
m_connections.erase(id);
}
void cluster_connection::initial_connect_result(ignite_result<void> &&res) {
[[maybe_unused]] std::lock_guard<std::mutex> lock(m_on_initial_connect_mutex);
if (!m_on_initial_connect)
return;
m_on_initial_connect(std::move(res));
m_on_initial_connect = {};
}
void cluster_connection::initial_connect_result(const protocol_context &context) {
[[maybe_unused]] std::lock_guard<std::mutex> lock(m_on_initial_connect_mutex);
if (!m_on_initial_connect)
return;
m_cluster_id = context.get_cluster_id();
m_on_initial_connect({});
m_on_initial_connect = {};
}
std::shared_ptr<node_connection> cluster_connection::get_random_channel() {
[[maybe_unused]] std::unique_lock<std::recursive_mutex> lock(m_connections_mutex);
if (m_connections.empty())
return {};
if (m_connections.size() == 1)
return m_connections.begin()->second;
std::uniform_int_distribution<size_t> distrib(0, m_connections.size() - 1);
auto idx = ptrdiff_t(distrib(m_generator));
return std::next(m_connections.begin(), idx)->second;
}
} // namespace ignite::detail