| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ignite/odbc/query/data_query.h" |
| #include "ignite/odbc/log.h" |
| #include "ignite/odbc/odbc_error.h" |
| #include "ignite/odbc/query/cursor.h" |
| #include "ignite/tuple/binary_tuple_builder.h" |
| |
| #include <memory> |
| #include <utility> |
| |
| namespace { |
| using namespace ignite; |
| |
| // TODO: IGNITE-19968 Avoid storing row columns in primitives, read them directly from binary tuple. |
| /** |
| * Put a primitive into a buffer. |
| * |
| * @param buffer ODBC buffer. |
| * @param value Value to put. |
| * @return Conversion result. |
| */ |
| conversion_result put_primitive_to_buffer(application_data_buffer &buffer, const primitive &value) { |
| if (value.is_null()) |
| return buffer.put_null(); |
| |
| switch (value.get_type()) { |
| case ignite_type::STRING: |
| return buffer.put_string(value.get<std::string>()); |
| |
| case ignite_type::INT8: |
| return buffer.put_int8(value.get<std::int8_t>()); |
| |
| case ignite_type::INT16: |
| return buffer.put_int16(value.get<std::int16_t>()); |
| |
| case ignite_type::INT32: |
| return buffer.put_int32(value.get<std::int32_t>()); |
| |
| case ignite_type::INT64: |
| return buffer.put_int64(value.get<std::int64_t>()); |
| |
| case ignite_type::DECIMAL: |
| return buffer.put_decimal(value.get<big_decimal>()); |
| |
| case ignite_type::FLOAT: |
| return buffer.put_float(value.get<float>()); |
| |
| case ignite_type::DOUBLE: |
| return buffer.put_double(value.get<double>()); |
| |
| case ignite_type::BOOLEAN: |
| return buffer.put_bool(value.get<bool>()); |
| |
| case ignite_type::UUID: |
| return buffer.put_uuid(value.get<uuid>()); |
| |
| case ignite_type::DATE: |
| return buffer.put_date(value.get<ignite_date>()); |
| |
| case ignite_type::TIMESTAMP: |
| return buffer.put_timestamp(value.get<ignite_timestamp>()); |
| |
| case ignite_type::TIME: |
| return buffer.put_time(value.get<ignite_time>()); |
| |
| case ignite_type::DATETIME: |
| return buffer.put_date_time(value.get<ignite_date_time>()); |
| |
| case ignite_type::BYTE_ARRAY: |
| return buffer.put_binary_data(value.get<std::vector<std::byte>>()); |
| |
| case ignite_type::PERIOD: |
| case ignite_type::DURATION: |
| default: |
| // TODO: IGNITE-19969 implement support for period, duration and big_integer |
| return conversion_result::AI_UNSUPPORTED_CONVERSION; |
| } |
| } |
| |
| std::vector<bytes_view> read_rows(protocol::reader &reader) { |
| auto size = reader.read_int32(); |
| |
| std::vector<bytes_view> rows; |
| rows.reserve(size); |
| |
| for (std::int32_t row_idx = 0; row_idx < size; ++row_idx) { |
| rows.emplace_back(reader.read_binary()); |
| } |
| |
| return rows; |
| } |
| |
| } // anonymous namespace |
| |
| namespace ignite { |
| |
| data_query::data_query(diagnosable_adapter &m_diag, sql_connection &m_connection, std::string sql, |
| parameter_set ¶ms, std::int32_t &timeout) |
| : query(m_diag, query_type::DATA) |
| , m_connection(m_connection) |
| , m_query(std::move(sql)) |
| , m_params(params) |
| , m_timeout(timeout) { |
| } |
| |
| data_query::~data_query() { |
| internal_close(); |
| } |
| |
| sql_result data_query::execute() { |
| internal_close(); |
| |
| return make_request_execute(); |
| } |
| |
| const protocol::column_meta_vector *data_query::get_meta() { |
| if (!m_result_meta_available) { |
| update_meta(); |
| |
| if (!m_result_meta_available) |
| return nullptr; |
| } |
| |
| return &m_result_meta; |
| } |
| |
| const sql_parameter *data_query::get_sql_param(std::int16_t idx) const { |
| if (idx > 0 && static_cast<std::size_t>(idx) <= m_params_meta.size()) |
| return &m_params_meta.at(idx - 1); |
| |
| return nullptr; |
| } |
| |
| sql_result data_query::fetch_next_row() { |
| if (!m_executed) { |
| m_diag.add_status_record(sql_state::SHY010_SEQUENCE_ERROR, "Query was not executed."); |
| |
| return sql_result::AI_ERROR; |
| } |
| |
| if (!m_has_rowset || !m_cursor) |
| return sql_result::AI_NO_DATA; |
| |
| m_cursor->next(m_result_meta); |
| |
| if (!has_more_rows()) |
| return sql_result::AI_NO_DATA; |
| |
| if (!m_cursor->has_data()) { |
| std::unique_ptr<result_page> page; |
| auto result = make_request_fetch(page); |
| |
| if (result != sql_result::AI_SUCCESS) |
| return result; |
| |
| m_cursor->update_data(std::move(page)); |
| m_cursor->next(m_result_meta); |
| } |
| |
| if (!m_cursor->has_data()) |
| return sql_result::AI_NO_DATA; |
| |
| return sql_result::AI_SUCCESS; |
| } |
| |
| sql_result data_query::fetch_next_row(column_binding_map &column_bindings) { |
| auto res = fetch_next_row(); |
| if (res != sql_result::AI_SUCCESS && res != sql_result::AI_SUCCESS_WITH_INFO) { |
| return res; |
| } |
| |
| auto row = m_cursor->get_row(); |
| assert(!row.empty()); |
| |
| for (std::size_t i = 0; i < row.size(); ++i) { |
| // Column indexing starts from 1 in ODBC. |
| auto column_idx = std::int32_t(i + 1); |
| auto it = column_bindings.find(column_idx); |
| if (it == column_bindings.end()) |
| continue; |
| |
| auto conv_res = put_primitive_to_buffer(it->second, row[i]); |
| auto result = process_conversion_result(conv_res, m_cursor->get_result_set_pos(), column_idx); |
| |
| if (result == sql_result::AI_ERROR) |
| return sql_result::AI_ERROR; |
| } |
| |
| return sql_result::AI_SUCCESS; |
| } |
| |
| sql_result data_query::get_column(std::uint16_t column_idx, application_data_buffer &buffer) { |
| if (!m_executed) { |
| m_diag.add_status_record(sql_state::SHY010_SEQUENCE_ERROR, "Query was not executed."); |
| |
| return sql_result::AI_ERROR; |
| } |
| |
| if (!m_has_rowset || !has_more_rows() || !m_cursor) |
| return sql_result::AI_NO_DATA; |
| |
| auto row = m_cursor->get_row(); |
| if (row.empty()) { |
| m_diag.add_status_record(sql_state::S24000_INVALID_CURSOR_STATE, |
| "Cursor is in a wrong position. " |
| "It is either have reached the end of the result set or no data was yet fetched."); |
| |
| return sql_result::AI_ERROR; |
| } |
| |
| auto conv_res = put_primitive_to_buffer(buffer, row[column_idx - 1]); |
| sql_result result = process_conversion_result(conv_res, m_cursor->get_result_set_pos(), column_idx); |
| |
| return result; |
| } |
| |
| sql_result data_query::close() { |
| return internal_close(); |
| } |
| |
| sql_result data_query::internal_close() { |
| if (!m_cursor) |
| return sql_result::AI_SUCCESS; |
| |
| sql_result result = sql_result::AI_SUCCESS; |
| |
| if (!is_closed_remotely()) |
| result = make_request_close(); |
| |
| if (result == sql_result::AI_SUCCESS) { |
| m_cursor.reset(); |
| m_rows_affected = -1; |
| m_executed = false; |
| } |
| |
| return result; |
| } |
| |
| bool data_query::is_data_available() const { |
| return m_has_more_pages || (m_cursor && m_cursor->has_data()); |
| } |
| |
| std::int64_t data_query::affected_rows() const { |
| return m_rows_affected; |
| } |
| |
| sql_result data_query::next_result_set() { |
| // TODO: IGNITE-19855 Multiple queries execution is not supported. |
| internal_close(); |
| return sql_result::AI_NO_DATA; |
| } |
| |
| sql_result data_query::make_request_execute() { |
| auto &schema = m_connection.get_schema(); |
| |
| bool single = m_params.get_param_set_size() <= 1; |
| auto success = m_diag.catch_errors([&] { |
| auto tx = m_connection.get_transaction_id(); |
| if (!tx && !m_connection.is_auto_commit()) { |
| // Starting transaction if it's not started already. |
| m_connection.transaction_start(); |
| |
| tx = m_connection.get_transaction_id(); |
| assert(tx); |
| } |
| |
| auto client_op = single ? protocol::client_operation::SQL_EXEC : protocol::client_operation::SQL_EXEC_BATCH; |
| |
| auto res = m_connection.sync_request_nothrow(client_op, [&](protocol::writer &writer) { |
| if (tx) |
| writer.write(*tx); |
| else |
| writer.write_nil(); |
| |
| writer.write(schema); |
| writer.write(m_connection.get_configuration().get_page_size().get_value()); |
| writer.write(std::int64_t(m_connection.get_timeout()) * 1000); |
| writer.write_nil(); // Session timeout (unused, session is closed by the server immediately). |
| |
| auto timezone = m_connection.get_configuration().get_timezone(); |
| if (timezone.is_set()) { |
| writer.write(timezone.get_value()); |
| } else { |
| writer.write_nil(); |
| } |
| |
| // Properties are not used for now. |
| writer.write(0); |
| binary_tuple_builder prop_builder{0}; |
| prop_builder.start(); |
| prop_builder.layout(); |
| auto prop_data = prop_builder.build(); |
| writer.write_binary(prop_data); |
| |
| writer.write(m_query); |
| |
| if (single) { |
| m_params.write(writer); |
| } else { |
| m_params.write(writer, 0, m_params.get_param_set_size(), true); |
| } |
| |
| writer.write(m_connection.get_observable_timestamp()); |
| }); |
| |
| // Check error |
| if (res.second) { |
| auto err = std::move(*res.second); |
| if (!single) { |
| auto affected_rows = err.get_cause()->get_extra<std::vector<std::int64_t>>( |
| protocol::error_extensions::SQL_UPDATE_COUNTERS); |
| if (affected_rows) { |
| process_affected_rows(*affected_rows); |
| } |
| } |
| |
| throw odbc_error{std::move(err)}; |
| } |
| |
| m_connection.mark_transaction_non_empty(); |
| |
| auto &response = res.first; |
| protocol::reader reader(response.get_bytes_view()); |
| m_query_id = reader.read_object_nullable<std::int64_t>(); |
| |
| m_has_rowset = reader.read_bool(); |
| m_has_more_pages = reader.read_bool(); |
| m_was_applied = reader.read_bool(); |
| |
| if (single) { |
| m_rows_affected = reader.read_int64(); |
| |
| if (m_has_rowset) { |
| auto columns = read_result_set_meta(reader); |
| set_resultset_meta(std::move(columns)); |
| auto rows = read_rows(reader); |
| |
| auto page = std::make_unique<result_page>(std::move(response), std::move(rows)); |
| m_cursor = std::make_unique<cursor>(std::move(page)); |
| } |
| |
| m_executed = true; |
| } else { |
| auto affected_rows = reader.read_int64_array(); |
| process_affected_rows(affected_rows); |
| } |
| }); |
| |
| return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR; |
| } |
| |
| void data_query::process_affected_rows(const std::vector<std::int64_t> &affected_rows) { |
| auto status_ptr = m_params.get_params_status_ptr(); |
| |
| m_rows_affected = 0; |
| for (auto &ar : affected_rows) { |
| m_rows_affected += ar; |
| } |
| m_params.set_params_processed(m_rows_affected); |
| |
| if (status_ptr) { |
| for (auto i = 0; i < m_params.get_param_set_size(); i++) { |
| status_ptr[i] = (size_t(i) < affected_rows.size()) ? SQL_PARAM_SUCCESS : SQL_PARAM_ERROR; |
| } |
| } |
| |
| m_executed = true; |
| } |
| |
| sql_result data_query::make_request_close() { |
| if (!m_query_id) |
| return sql_result::AI_SUCCESS; |
| |
| LOG_MSG("Closing cursor: " << *m_query_id); |
| |
| auto success = m_diag.catch_errors([&] { |
| UNUSED_VALUE m_connection.sync_request( |
| protocol::client_operation::SQL_CURSOR_CLOSE, [&](protocol::writer &writer) { writer.write(*m_query_id); }); |
| }); |
| |
| return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR; |
| } |
| |
| sql_result data_query::make_request_fetch(std::unique_ptr<result_page> &page) { |
| if (!m_query_id) { |
| m_diag.add_status_record(sql_state::SHY010_SEQUENCE_ERROR, "Cursor already closed"); |
| return sql_result::AI_ERROR; |
| } |
| |
| network::data_buffer_owning response; |
| auto success = m_diag.catch_errors([&] { |
| response = m_connection.sync_request(protocol::client_operation::SQL_CURSOR_NEXT_PAGE, |
| [&](protocol::writer &writer) { writer.write(*m_query_id); }); |
| |
| protocol::reader reader(response.get_bytes_view()); |
| auto rows = read_rows(reader); |
| m_has_more_pages = reader.read_bool(); |
| |
| page = std::make_unique<result_page>(std::move(response), std::move(rows)); |
| }); |
| |
| return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR; |
| } |
| |
| sql_result data_query::update_meta() { |
| auto &schema = m_connection.get_schema(); |
| |
| auto success = m_diag.catch_errors([&] { |
| auto tx = m_connection.get_transaction_id(); |
| auto response = |
| m_connection.sync_request(protocol::client_operation::SQL_QUERY_META, [&](protocol::writer &writer) { |
| if (tx) |
| writer.write(*tx); |
| else |
| writer.write_nil(); |
| |
| writer.write(schema); |
| writer.write(m_query); |
| }); |
| |
| if (tx) { |
| m_connection.mark_transaction_non_empty(); |
| } |
| |
| auto reader = std::make_unique<protocol::reader>(response.get_bytes_view()); |
| auto num = reader->read_int32(); |
| |
| if (num < 0) { |
| throw odbc_error( |
| sql_state::SHY000_GENERAL_ERROR, "Unexpected number of parameters: " + std::to_string(num)); |
| } |
| |
| std::vector<sql_parameter> params; |
| params.reserve(num); |
| |
| for (std::int32_t i = 0; i < num; ++i) { |
| sql_parameter param{}; |
| param.nullable = reader->read_bool(); |
| param.data_type = ignite_type(reader->read_int32()); |
| param.scale = reader->read_int32(); |
| param.precision = reader->read_int32(); |
| |
| params.emplace_back(param); |
| } |
| |
| set_params_meta(std::move(params)); |
| |
| auto columns = read_result_set_meta(*reader); |
| set_resultset_meta(std::move(columns)); |
| }); |
| |
| return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR; |
| } |
| |
| sql_result data_query::process_conversion_result( |
| conversion_result conv_res, std::int32_t row_idx, std::int32_t column_idx) { |
| switch (conv_res) { |
| case conversion_result::AI_SUCCESS: { |
| return sql_result::AI_SUCCESS; |
| } |
| |
| case conversion_result::AI_NO_DATA: { |
| return sql_result::AI_NO_DATA; |
| } |
| |
| case conversion_result::AI_VARLEN_DATA_TRUNCATED: { |
| m_diag.add_status_record(sql_state::S01004_DATA_TRUNCATED, |
| "Buffer is too small for the column data. Truncated from the right.", row_idx, column_idx); |
| |
| return sql_result::AI_SUCCESS_WITH_INFO; |
| } |
| |
| case conversion_result::AI_FRACTIONAL_TRUNCATED: { |
| m_diag.add_status_record(sql_state::S01S07_FRACTIONAL_TRUNCATION, |
| "Buffer is too small for the column data. Fraction truncated.", row_idx, column_idx); |
| |
| return sql_result::AI_SUCCESS_WITH_INFO; |
| } |
| |
| case conversion_result::AI_INDICATOR_NEEDED: { |
| m_diag.add_status_record(sql_state::S22002_INDICATOR_NEEDED, |
| "Indicator is needed but not supplied for the column buffer.", row_idx, column_idx); |
| |
| return sql_result::AI_SUCCESS_WITH_INFO; |
| } |
| |
| case conversion_result::AI_UNSUPPORTED_CONVERSION: { |
| m_diag.add_status_record(sql_state::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, |
| "Data conversion is not supported.", row_idx, column_idx); |
| |
| return sql_result::AI_SUCCESS_WITH_INFO; |
| } |
| |
| case conversion_result::AI_FAILURE: |
| default: { |
| m_diag.add_status_record( |
| sql_state::S01S01_ERROR_IN_ROW, "Can not retrieve row column.", row_idx, column_idx); |
| |
| break; |
| } |
| } |
| |
| return sql_result::AI_ERROR; |
| } |
| |
| void data_query::set_resultset_meta(protocol::column_meta_vector value) { |
| m_result_meta = std::move(value); |
| m_result_meta_available = true; |
| |
| for (size_t i = 0; i < m_result_meta.size(); ++i) { |
| protocol::column_meta &meta = m_result_meta.at(i); |
| LOG_MSG("[" << i << "] SchemaName: " << meta.get_schema_name()); |
| LOG_MSG("[" << i << "] TableName: " << meta.get_table_name()); |
| LOG_MSG("[" << i << "] ColumnName: " << meta.get_column_name()); |
| LOG_MSG("[" << i << "] ColumnType: " << static_cast<int32_t>(meta.get_data_type())); |
| } |
| } |
| |
| void data_query::set_params_meta(std::vector<sql_parameter> value) { |
| m_params_meta = std::move(value); |
| m_params_meta_available = true; |
| |
| for (size_t i = 0; i < m_params_meta.size(); ++i) { |
| sql_parameter &meta = m_params_meta.at(i); |
| LOG_MSG("[" << i << "] ParamType: " << meta.data_type); |
| LOG_MSG("[" << i << "] Scale: " << meta.scale); |
| LOG_MSG("[" << i << "] Precision: " << meta.precision); |
| LOG_MSG("[" << i << "] Nullable: " << (meta.nullable ? "true" : "false")); |
| } |
| } |
| } // namespace ignite |