blob: 7b729fa625e89d02a1658fb57768daadfbbb21fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "ignite/odbc/config/configuration.h"
#include "ignite/odbc/config/connection_info.h"
#include "ignite/odbc/diagnostic/diagnosable_adapter.h"
#include "ignite/odbc/odbc_error.h"
#include "ignite/protocol/protocol_version.h"
#include "ignite/network/data_buffer.h"
#include "ignite/network/socket_client.h"
#include "ignite/protocol/client_operation.h"
#include "ignite/protocol/writer.h"
#include <atomic>
#include <cstdint>
#include <vector>
namespace ignite {
class sql_environment;
class sql_statement;
/**
* ODBC node connection.
*/
class sql_connection : public diagnosable_adapter {
friend class sql_environment;
public:
/**
* Operation with timeout result.
*/
enum class operation_result { SUCCESS, FAIL, TIMEOUT };
/** Default connection timeout in seconds. */
enum { DEFAULT_CONNECT_TIMEOUT = 5 };
// Delete
sql_connection(sql_connection &&) = delete;
sql_connection(const sql_connection &) = delete;
sql_connection &operator=(sql_connection &&) = delete;
sql_connection &operator=(const sql_connection &) = delete;
/**
* Get connection info.
*
* @return Connection info.
*/
[[nodiscard]] const connection_info &get_info() const { return m_info; }
/**
* Get info of any type.
*
* @param type Info type.
* @param buf Result buffer pointer.
* @param buffer_len Result buffer length.
* @param result_len Result value length pointer.
*/
void get_info(connection_info::info_type type, void *buf, short buffer_len, short *result_len);
/**
* Establish connection to ODBC server.
*
* @param connectStr Connection string.
* @param parentWindow Parent window pointer.
*/
void establish(const std::string &connectStr, void *parentWindow);
/**
* Establish connection to ODBC server.
*
* @param cfg Configuration.
*/
void establish(const configuration &cfg);
/**
* Release established connection.
*
* @return Operation result.
*/
void release();
/**
* Deregister self from the parent.
*/
void deregister();
/**
* Create statement associated with the connection.
*
* @return Pointer to valid instance on success and NULL on failure.
*/
sql_statement *create_statement();
/**
* Send data by established connection.
* Uses connection timeout.
*
* @param data Data buffer.
* @param len Data length.
* @return @c true on success, @c false on timeout.
* @throw odbc_error on error.
*/
bool send(const std::byte *data, std::size_t len) { return send(data, len, m_timeout); }
/**
* Send data by established connection.
*
* @param data Data buffer.
* @param len Data length.
* @param timeout Timeout.
* @return @c true on success, @c false on timeout.
* @throw odbc_error on error.
*/
bool send(const std::byte *data, std::size_t len, std::int32_t timeout);
/**
* Receive next message.
*
* @param msg Buffer for message.
* @param timeout Timeout.
* @return @c true on success, @c false on timeout.
* @throw odbc_error on error.
*/
bool receive(std::vector<std::byte> &msg, std::int32_t timeout);
/**
* Receive next message.
*
* @param msg Buffer for message.
* @param timeout Timeout.
* @return @c true on success, @c false on timeout.
*/
bool receive_and_check_magic(std::vector<std::byte> &msg, std::int32_t timeout);
/**
* Synchronously send request message.
* Uses provided timeout.
*
* @param req Request message.
* @param timeout Timeout. 0 means disabled.
* @throw OdbcError on error.
*/
void send_message(bytes_view req, std::int32_t timeout);
/**
* Synchronously send request message.
*
* @param req Request message.
* @throw OdbcError on error.
*/
void send_message(bytes_view req) { send_message(req, m_timeout); }
/**
* Receive message.
*
* @param id Expected message ID.
* @param timeout Timeout.
* @return Message.
*/
network::data_buffer_owning receive_message(std::int64_t id, std::int32_t timeout);
/**
* Receive message.
*
* @param id Expected message ID.
* @param timeout Timeout.
* @return Message and error.
*/
std::pair<network::data_buffer_owning, std::optional<odbc_error>> receive_message_nothrow(std::int64_t id,
std::int32_t timeout);
/**
* Receive message.
*
* @param id Expected message ID.
* @return Message.
*/
network::data_buffer_owning receive_message(std::int64_t id) { return receive_message(id, m_timeout); }
/**
* Receive message.
*
* @param id Expected message ID.
* @return Message and error.
*/
std::pair<network::data_buffer_owning, std::optional<odbc_error>> receive_message_nothrow(std::int64_t id) {
return receive_message_nothrow(id, m_timeout);
}
/**
* Get configuration.
*
* @return Connection configuration.
*/
[[nodiscard]] const configuration &get_configuration() const;
/**
* Is auto commit.
*
* @return @c true if the auto commit is enabled.
*/
[[nodiscard]] bool is_auto_commit() const;
/**
* Perform transaction commit.
*/
void transaction_commit();
/**
* Perform transaction rollback.
*/
void transaction_rollback();
/**
* Start transaction.
*
* @return Operation result.
*/
void transaction_start();
/**
* Get connection attribute.
*
* @param attr Attribute type.
* @param buf Buffer for value.
* @param buf_len Buffer length.
* @param value_len Resulting value length.
*/
void get_attribute(int attr, void *buf, SQLINTEGER buf_len, SQLINTEGER *value_len);
/**
* Set connection attribute.
*
* @param attr Attribute type.
* @param value Value pointer.
* @param value_len Value length.
*/
void set_attribute(int attr, void *value, SQLINTEGER value_len);
/**
* Make new request.
*
* @param id Request ID.
* @param op Operation.
* @param func Function.
*/
[[nodiscard]] static std::vector<std::byte> make_request(
std::int64_t id, protocol::client_operation op, const std::function<void(protocol::writer &)> &func);
/**
* Get connection schema.
*
* @return Schema.
*/
const std::string &get_schema() const { return m_config.get_schema().get_value(); }
/**
* Get timeout.
*
* @return Timeout.
*/
std::int32_t get_timeout() const { return m_timeout; }
/**
* Make a synchronous request and get a response.
*
* @param op Operation.
* @param wr Payload writing function.
* @return Response.
*/
network::data_buffer_owning sync_request(
protocol::client_operation op, const std::function<void(protocol::writer &)> &wr) {
auto req_id = generate_next_req_id();
auto request = make_request(req_id, op, wr);
send_message(request);
return receive_message(req_id);
}
/**
* Make a synchronous request and get a response.
*
* @param op Operation.
* @param wr Payload writing function.
* @return Response and error.
*/
std::pair<network::data_buffer_owning, std::optional<odbc_error>> sync_request_nothrow(
protocol::client_operation op, const std::function<void(protocol::writer &)> &wr) {
auto req_id = generate_next_req_id();
auto request = make_request(req_id, op, wr);
send_message(request);
return receive_message_nothrow(req_id);
}
/**
* Get transaction ID.
*
* @return Transaction ID.
*/
[[nodiscard]] std::optional<std::int64_t> get_transaction_id() const { return m_transaction_id; }
/**
* Mark transaction non-empty.
*
* After this call connection assumes there is at least one operation performed with this transaction.
*/
void mark_transaction_non_empty() { m_transaction_empty = false; }
/**
* Get observable timestamp.
*
* @return Observable timestamp.
*/
std::int64_t get_observable_timestamp() const { return m_observable_timestamp.load(); }
private:
/**
* Generate and get next request ID.
*
* @return Request ID.
*/
std::int64_t generate_next_req_id() { return m_req_id_gen.fetch_add(1); }
/**
* Init connection socket, using configuration.
*
* @return Operation result.
*/
[[nodiscard]] sql_result init_socket();
/**
* Establish connection to ODBC server.
* Internal call.
*
* @param connectStr Connection string.
* @param parentWindow Parent window.
* @return Operation result.
*/
sql_result internal_establish(const std::string &connectStr, void *parentWindow);
/**
* Establish connection to ODBC server.
* Internal call.
*
* @param cfg Configuration.
* @return Operation result.
*/
sql_result internal_establish(const configuration &cfg);
/**
* Release established connection.
* Internal call.
*
* @return Operation result.
*/
sql_result internal_release();
/**
* Close connection.
*/
void close();
/**
* Get info of any type.
* Internal call.
*
* @param type Info type.
* @param buf Result buffer pointer.
* @param buffer_len Result buffer length.
* @param result_len Result value length pointer.
* @return Operation result.
*/
sql_result internal_get_info(connection_info::info_type type, void *buf, short buffer_len, short *result_len);
/**
* Create statement associated with the connection.
* Internal call.
*
* @param statement Pointer to valid instance on success and NULL on failure.
* @return Operation result.
*/
sql_result internal_create_statement(sql_statement *&statement);
/**
* Perform transaction commit on all the associated connections.
* Internal call.
*
* @return Operation result.
*/
sql_result internal_transaction_commit();
/**
* Perform transaction rollback on all the associated connections.
* Internal call.
*
* @return Operation result.
*/
sql_result internal_transaction_rollback();
/**
* Enable autocommit.
*
* @return Operation result.
*/
sql_result enable_autocommit();
/**
* Disable autocommit.
*
* @return Operation result.
*/
sql_result disable_autocommit();
/**
* Get connection attribute.
* Internal call.
*
* @param attr Attribute type.
* @param buf Buffer for value.
* @param buf_len Buffer length.
* @param value_len Resulting value length.
* @return Operation result.
*/
sql_result internal_get_attribute(int attr, void *buf, SQLINTEGER buf_len, SQLINTEGER *value_len);
/**
* Set connection attribute.
* Internal call.
*
* @param attr Attribute type.
* @param value Value pointer.
* @param value_len Value length.
* @return Operation result.
*/
sql_result internal_set_attribute(int attr, void *value, SQLINTEGER value_len);
/**
* Receive specified number of bytes.
*
* @param dst Buffer for data.
* @param len Number of bytes to receive.
* @param timeout Timeout.
* @return Operation result.
*/
operation_result receive_all(void *dst, std::size_t len, std::int32_t timeout);
/**
* Send specified number of bytes.
*
* @param data Data buffer.
* @param len Data length.
* @param timeout Timeout.
* @return Operation result.
*/
operation_result send_all(const std::byte *data, std::size_t len, std::int32_t timeout);
/**
* Perform handshake request.
*
* @return Operation result.
*/
sql_result make_request_handshake();
/**
* Ensure there is a connection to the cluster.
*
* @throw odbc_error on failure.
*/
void ensure_connected();
/**
* Try to restore connection to the cluster.
*
* @throw ignite_error on failure.
* @return @c true on success and @c false otherwise.
*/
bool try_restore_connection();
/**
* Safe connect.
*
* @param addr Address.
* @return @c true if connection was successful.
*/
bool safe_connect(const end_point &addr);
/**
* Retrieve timeout from parameter.
*
* @param value parameter.
* @return Timeout.
*/
std::int32_t retrieve_timeout(void *value);
/**
* Handle new observable timestamp.
*
* @param timestamp Timestamp.
*/
void on_observable_timestamp(std::int64_t timestamp);
/**
* Constructor.
*/
explicit sql_connection(sql_environment *env)
: m_env(env)
, m_config()
, m_info(m_config) {}
/** Parent. */
sql_environment *m_env;
/** Connection timeout in seconds. */
std::int32_t m_timeout{0};
/** Login timeout in seconds. */
std::int32_t m_login_timeout{DEFAULT_CONNECT_TIMEOUT};
/** Autocommit flag. */
bool m_auto_commit{true};
/** Current transaction ID. */
std::optional<std::int64_t> m_transaction_id;
/** Current transaction empty. */
bool m_transaction_empty{true};
/** Configuration. */
configuration m_config;
/** Connection info. */
connection_info m_info;
/** Socket client. */
std::unique_ptr<network::socket_client> m_socket;
/** Protocol version. */
protocol::protocol_version m_protocol_version;
/** Request ID generator. */
std::atomic_int64_t m_req_id_gen{0};
/** Observable timestamp. */
std::atomic_int64_t m_observable_timestamp{0};
};
} // namespace ignite