blob: d75912d1eae789026507d6cde716695ff90353b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "ignite/client/detail/connection_event_handler.h"
#include "ignite/client/detail/node_connection.h"
#include "ignite/client/detail/response_handler.h"
#include "ignite/client/detail/transaction/transaction_impl.h"
#include "ignite/client/ignite_client_configuration.h"
#include "ignite/protocol/protocol_context.h"
#include "ignite/common/ignite_result.h"
#include "ignite/common/detail/thread_timer.h"
#include "ignite/network/async_client_pool.h"
#include "ignite/protocol/client_operation.h"
#include "ignite/protocol/reader.h"
#include "ignite/protocol/writer.h"
#include <functional>
#include <memory>
#include <mutex>
#include <random>
#include <optional>
#include <unordered_map>
namespace ignite::protocol {
class reader;
} // namespace ignite::protocol
namespace ignite::detail {
/**
* Represents connection to the cluster.
*
* Considered established while there is connection to at least one server.
*/
class cluster_connection : public std::enable_shared_from_this<cluster_connection>,
public network::async_handler,
public connection_event_handler {
public:
template<typename T>
using reader_function_type = std::function<T(protocol::reader &)>;
typedef std::function<void(protocol::writer&, const protocol::protocol_context&)> writer_function_type;
typedef std::function<protocol::client_operation(const protocol::protocol_context&)> operation_function_type;
/** Default TCP port. */
static constexpr uint16_t DEFAULT_TCP_PORT = protocol::protocol_context::DEFAULT_TCP_PORT;
/**
* Create a new instance of the object.
*
* @param configuration Configuration.
* @return New instance.
*/
static std::shared_ptr<cluster_connection> create(ignite_client_configuration configuration) {
return std::shared_ptr<cluster_connection>(new cluster_connection(std::move(configuration)));
}
// Deleted
cluster_connection() = delete;
cluster_connection(cluster_connection &&) = delete;
cluster_connection(const cluster_connection &) = delete;
cluster_connection &operator=(cluster_connection &&) = delete;
cluster_connection &operator=(const cluster_connection &) = delete;
/**
* Destructor.
*/
~cluster_connection() override { stop(); }
/**
* Start establishing connection.
*
* @param callback Callback.
*/
void start_async(std::function<void(ignite_result<void>)> callback);
/**
* Stop connection.
*/
void stop();
/**
* Perform request raw.
*
* @param op_func Function that provides operation code.
* @param tx Transaction.
* @param wr Request writer function.
* @param handler Request handler.
* @return A connection used to perform request and the request ID.
*/
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request_handler(const operation_function_type &op_func,
transaction_impl *tx, const writer_function_type &wr, const std::shared_ptr<response_handler> &handler);
/**
* Perform request raw.
*
* @tparam T Result type.
* @param op Operation code.
* @param tx Transaction.
* @param wr Request writer function.
* @param callback Callback to call on a result.
*/
void perform_request_raw(protocol::client_operation op, transaction_impl *tx,
const writer_function_type &wr, ignite_callback<bytes_view> callback);
/**
* Perform request raw.
*
* @tparam T Result type.
* @param op Operation code.
* @param tx Transaction.
* @param wr Request writer function.
* @param rd Response reader function.
* @param callback Callback to call on a result.
* @return A connection used to perform request and the request ID.
*/
template<typename T>
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request_bytes(protocol::client_operation op,
transaction_impl *tx, const writer_function_type &wr,
std::function<T(std::shared_ptr<node_connection>, bytes_view)> rd, ignite_callback<T> callback) {
auto handler = std::make_shared<response_handler_bytes<T>>(std::move(rd), std::move(callback));
return perform_request_handler(static_op(op), tx, wr, std::move(handler));
}
/**
* Perform request.
*
* @tparam T Result type.
* @param op Operation code.
* @param tx Transaction.
* @param wr Request writer function.
* @param rd Response reader function.
* @param callback Callback to call on a result.
*/
template<typename T>
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request(protocol::client_operation op,
transaction_impl *tx, const writer_function_type &wr,
reader_function_type<T> rd, ignite_callback<T> callback) {
auto handler = std::make_shared<response_handler_reader<T>>(std::move(rd), std::move(callback));
return perform_request_handler(static_op(op), tx, wr, std::move(handler));
}
/**
* Perform request.
*
* @tparam T Result type.
* @param op Operation code.
* @param wr Request writer function.
* @param rd response reader function.
* @param callback Callback to call on a result.
*/
template<typename T>
void perform_request(protocol::client_operation op, const writer_function_type &wr, reader_function_type<T> rd,
ignite_callback<T> callback) {
auto handler = std::make_shared<response_handler_reader<T>>(std::move(rd), std::move(callback));
perform_request_handler(static_op(op), nullptr, wr, std::move(handler));
}
/**
* Perform request.
*
* @tparam T Result type.
* @param op Operation code.
* @param wr Request writer function.
* @param rd response reader function.
* @param callback Callback to call on a result.
*/
template<typename T>
void perform_request(protocol::client_operation op, const writer_function_type &wr,
std::function<T(protocol::reader &, std::shared_ptr<node_connection>)> rd, ignite_callback<T> callback) {
auto handler = std::make_shared<response_handler_reader_connection<T>>(std::move(rd), std::move(callback));
perform_request_handler(static_op(op), nullptr, wr, std::move(handler));
}
/**
* Perform request.
*
* @tparam T Result type.
* @param op_func Function that provides operation code.
* @param wr Request writer function.
* @param rd response reader function.
* @param callback Callback to call on a result.
*/
template<typename T>
void perform_request(const operation_function_type &op_func, const writer_function_type &wr,
std::function<T(protocol::reader &, std::shared_ptr<node_connection>)> rd, ignite_callback<T> callback) {
auto handler = std::make_shared<response_handler_reader_connection<T>>(std::move(rd), std::move(callback));
perform_request_handler(op_func, nullptr, wr, std::move(handler));
}
/**
* Perform request without input data.
*
* @tparam T Result type.
* @param op Operation code.
* @param rd response reader function.
* @param callback Callback to call on a result.
*/
template<typename T>
void perform_request_rd(
protocol::client_operation op, reader_function_type<T> rd, ignite_callback<T> callback) {
perform_request<T>(op, [](auto&, auto&) {}, std::move(rd), std::move(callback));
}
/**
* Perform request without input data.
*
* @tparam T Result type.
* @param op Operation code.
* @param rd response reader function.
* @param callback Callback to call on a result.
*/
template<typename T>
void perform_request_rd(protocol::client_operation op,
std::function<T(protocol::reader &, std::shared_ptr<node_connection>)> rd, ignite_callback<T> callback) {
perform_request<T>(static_op(op), [](auto&, auto&) {}, std::move(rd), std::move(callback));
}
/**
* Perform request without output data.
*
* @tparam T Result type.
* @param op Operation code.
* @param wr Request writer function.
* @param callback Callback to call on a result.
*/
template<typename T>
void perform_request_wr(
protocol::client_operation op, const writer_function_type &wr, ignite_callback<T> callback) {
perform_request<T>(static_op(op), wr, [](protocol::reader &) {}, std::move(callback));
}
/**
* Perform request without output data.
*
* @tparam T Result type.
* @param op Operation code.
* @param tx Transaction.
* @param wr Request writer function.
* @param callback Callback to call on a result.
* @return A connection used to perform request and the request ID.
*/
template<typename T>
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request_wr(protocol::client_operation op,
transaction_impl *tx, const writer_function_type &wr, ignite_callback<T> callback) {
return perform_request<T>(op, tx, wr, [](protocol::reader &) {}, std::move(callback));
}
/**
* Get observable timestamp.
*
* @return Observable timestamp.
*/
std::int64_t get_observable_timestamp() const { return m_observable_timestamp.load(); }
/**
* @param op Operation code to return.
* @return A function that always returns the same operation.
*/
[[nodiscard]] static operation_function_type static_op(protocol::client_operation op) {
return [op](auto) {return op;};
}
private:
/**
* Get a random node connection.
*
* @return Random node connection or nullptr if there are no active connections.
*/
std::shared_ptr<node_connection> get_random_connected_channel();
/**
* Constructor.
*
* @param configuration Configuration.
*/
explicit cluster_connection(ignite_client_configuration configuration);
/**
* Callback that called on successful connection establishment.
*
* @param addr Address of the new connection.
* @param id Connection ID.
*/
void on_connection_success(const end_point &addr, uint64_t id) override;
/**
* Callback that called on error during a connection establishment.
*
* @param addr Connection address.
* @param err Error.
*/
void on_connection_error(const end_point &addr, ignite_error err) override;
/**
* Callback that called on error during a connection establishment.
*
* @param id Async client ID.
* @param err Error. Can be null if connection closed without an error.
*/
void on_connection_closed(uint64_t id, std::optional<ignite_error> err) override;
/**
* Callback that called when a new message is received.
*
* @param id Async client ID.
* @param msg Received message.
*/
void on_message_received(uint64_t id, bytes_view msg) override;
/**
* Callback that called when a message is sent.
*
* @param id Async client ID.
*/
void on_message_sent(uint64_t id) override;
/**
* Handle observable timestamp.
*
* @param timestamp Timestamp.
*/
void on_observable_timestamp_changed(std::int64_t timestamp) override;
/**
* Remove client.
*
* @param id Connection ID.
*/
void remove_client(uint64_t id);
/**
* Handle a failed initial connection result.
*
* @param res Connect result.
*/
void initial_connect_result(ignite_result<void> &&res);
/**
* Handle successful initial connection result.
*
* @param context Protocol context.
*/
void initial_connect_result(const protocol::protocol_context &context);
/**
* Find and return client.
*
* @param id Client ID.
* @return Client if found and nullptr otherwise.
*/
[[nodiscard]] std::shared_ptr<node_connection> find_client(uint64_t id);
/** Configuration. */
const ignite_client_configuration m_configuration;
/** Callback to call on initial connecting. */
std::function<void(ignite_result<void>)> m_on_initial_connect;
/** Cluster ID. */
std::optional<uuid> m_cluster_id;
/** Initial connect mutex. */
std::mutex m_on_initial_connect_mutex;
/** Connection pool. */
std::shared_ptr<network::async_client_pool> m_pool;
/** Logger. */
std::shared_ptr<ignite_logger> m_logger;
/** Pending node connections. */
std::unordered_map<uint64_t, std::shared_ptr<node_connection>> m_pending_connections;
/** Node connections. */
std::unordered_map<uint64_t, std::shared_ptr<node_connection>> m_connections;
/** Connections mutex. */
std::recursive_mutex m_connections_mutex;
/** Generator. */
std::mt19937 m_generator;
/** Observable timestamp. */
std::atomic_int64_t m_observable_timestamp{0};
/** Timer thread. */
std::shared_ptr<thread_timer> m_timer_thread;
};
} // namespace ignite::detail