blob: f33a94e999fe9a7f53c708f55c41e3cc0f3e5570 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#pragma once
#include "ignite/protocol/protocol_context.h"
#include <ignite/client/detail/connection_event_handler.h>
#include <ignite/client/detail/response_handler.h>
#include <ignite/client/ignite_client_configuration.h>
#include <ignite/protocol/client_operation.h>
#include <ignite/common/utils.h>
#include <ignite/network/async_client_pool.h>
#include <ignite/protocol/reader.h>
#include <ignite/protocol/writer.h>
#include <atomic>
#include <future>
#include <memory>
#include <mutex>
#include <unordered_map>
namespace ignite::detail {
class cluster_connection;
* Represents connection to the cluster.
* Considered established while there is connection to at least one server.
class node_connection : public std::enable_shared_from_this<node_connection> {
friend class cluster_connection;
// Deleted
node_connection() = delete;
node_connection(node_connection &&) = delete;
node_connection(const node_connection &) = delete;
node_connection &operator=(node_connection &&) = delete;
node_connection &operator=(const node_connection &) = delete;
* Destructor.
* Makes new instance.
* @param id Connection ID.
* @param pool Connection pool.
* @param event_handler Event handler.
* @param logger Logger.
* @param cfg Configuration.
* @return New instance.
static std::shared_ptr<node_connection> make_new(std::uint64_t id, std::shared_ptr<network::async_client_pool> pool,
std::weak_ptr<connection_event_handler> event_handler, std::shared_ptr<ignite_logger> logger,
const ignite_client_configuration &cfg) {
return std::shared_ptr<node_connection>(
new node_connection(id, std::move(pool), std::move(event_handler), std::move(logger), cfg));
* Get connection ID.
* @return ID.
[[nodiscard]] std::uint64_t id() const { return m_id; }
* Check whether handshake complete.
* @return @c true if the handshake complete.
[[nodiscard]] bool is_handshake_complete() const { return m_handshake_complete; }
* Send request.
* @tparam T Result type.
* @param op Operation code.
* @param wr Writer function.
* @param handler response handler.
* @return @c true on success and @c false otherwise.
bool perform_request(protocol::client_operation op, const std::function<void(protocol::writer &)> &wr,
std::shared_ptr<response_handler> handler) {
auto req_id = generate_request_id();
std::vector<std::byte> message;
protocol::buffer_adapter buffer(message);
protocol::writer writer(buffer);
std::lock_guard<std::recursive_mutex> lock(m_request_handlers_mutex);
m_request_handlers[req_id] = std::move(handler);
if (m_logger->is_debug_enabled()) {
"Performing request: op=" + std::to_string(int(op)) + ", req_id=" + std::to_string(req_id));
bool sent = m_pool->send(m_id, std::move(message));
if (!sent) {
return false;
return true;
* Perform request.
* @tparam T Result type.
* @param op Operation code.
* @param wr Request writer function.
* @param rd response reader function.
* @param callback Callback to call on result.
* @return Channel used for the request.
template<typename T>
bool perform_request(protocol::client_operation op, const std::function<void(protocol::writer &)> &wr,
std::function<T(protocol::reader &)> rd, ignite_callback<T> callback) {
auto handler = std::make_shared<response_handler_reader<T>>(std::move(rd), std::move(callback));
return perform_request(op, wr, std::move(handler));
* Perform request without output data.
* @tparam T Result type.
* @param op Operation code.
* @param wr Request writer function.
* @param callback Callback to call on result.
* @return Channel used for the request.
template<typename T>
void perform_request_wr(
protocol::client_operation op, const std::function<void(protocol::writer &)> &wr, ignite_callback<T> callback) {
op, wr, [](protocol::reader &) {}, std::move(callback));
* Perform handshake.
* @return @c true on success and @c false otherwise.
bool handshake();
* Callback that called when new message is received.
* @param msg Received message.
void process_message(bytes_view msg);
* Process handshake response.
* @param msg Handshake response message.
ignite_result<void> process_handshake_rsp(bytes_view msg);
* Gets protocol context.
* @return Protocol context.
const protocol::protocol_context &get_protocol_context() const { return m_protocol_context; }
* Constructor.
* @param id Connection ID.
* @param pool Connection pool.
* @param event_handler Event handler.
* @param logger Logger.
* @param cfg Configuration.
node_connection(std::uint64_t id, std::shared_ptr<network::async_client_pool> pool,
std::weak_ptr<connection_event_handler> event_handler, std::shared_ptr<ignite_logger> logger,
const ignite_client_configuration &cfg);
* Generate next request ID.
* @return New request ID.
[[nodiscard]] std::int64_t generate_request_id() { return m_req_id_gen.fetch_add(1, std::memory_order_relaxed); }
* Get and remove request handler.
* @param reqId Request ID.
* @return Handler.
std::shared_ptr<response_handler> get_and_remove_handler(std::int64_t req_id);
* Find handler by ID.
* @warning Warning: m_request_handlers_mutex should be locked.
* @param reqId Request ID.
* @return Handler.
std::shared_ptr<response_handler> find_handler_unsafe(std::int64_t req_id);
* Notify event handler about observable timestamp change.
* @param observable_timestamp New observable timestamp.
void on_observable_timestamp_changed(int64_t observable_timestamp) const;
/** Handshake complete. */
bool m_handshake_complete{false};
/** Protocol context. */
protocol::protocol_context m_protocol_context;
/** Connection ID. */
std::uint64_t m_id{0};
/** Connection pool. */
std::shared_ptr<network::async_client_pool> m_pool;
/** Connection event handler. */
std::weak_ptr<connection_event_handler> m_event_handler;
/** Request ID generator. */
std::atomic_int64_t m_req_id_gen{0};
/** Pending request handlers. */
std::unordered_map<std::int64_t, std::shared_ptr<response_handler>> m_request_handlers;
/** Handlers map mutex. */
std::recursive_mutex m_request_handlers_mutex;
/** Logger. */
std::shared_ptr<ignite_logger> m_logger;
/** Configuration. */
const ignite_client_configuration &m_configuration;
} // namespace ignite::detail