blob: ee169c237d360f8a206cee0e70d08ccb6ebae05c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils.h"
#include <ignite/client/detail/client_error_flags.h>
#include "ignite/common/detail/bits.h"
#include <ignite/common/uuid.h>
#include <ignite/protocol/utils.h>
#include <string>
namespace ignite::detail {
/**
* Claim space for the column.
*
* @param builder Binary tuple builder.
* @param typ Column type.
* @param value Value.
* @param scale Column scale.
*/
void claim_column(binary_tuple_builder &builder, ignite_type typ, const primitive &value, std::int32_t scale) {
switch (typ) {
case ignite_type::BOOLEAN:
builder.claim_bool(value.get<bool>());
break;
case ignite_type::INT8:
builder.claim_int8(value.get<std::int8_t>());
break;
case ignite_type::INT16:
builder.claim_int16(value.get<std::int16_t>());
break;
case ignite_type::INT32:
builder.claim_int32(value.get<std::int32_t>());
break;
case ignite_type::INT64:
builder.claim_int64(value.get<std::int64_t>());
break;
case ignite_type::FLOAT:
builder.claim_float(value.get<float>());
break;
case ignite_type::DOUBLE:
builder.claim_double(value.get<double>());
break;
case ignite_type::UUID:
builder.claim_uuid(value.get<uuid>());
break;
case ignite_type::STRING:
builder.claim_varlen(value.get<std::string>());
break;
case ignite_type::BYTE_ARRAY:
builder.claim_varlen(value.get<std::vector<std::byte>>());
break;
case ignite_type::DECIMAL: {
big_decimal to_write;
value.get<big_decimal>().set_scale(scale, to_write);
builder.claim_number(to_write);
break;
}
case ignite_type::DATE:
builder.claim_date(value.get<ignite_date>());
break;
case ignite_type::TIME:
builder.claim_time(value.get<ignite_time>());
break;
case ignite_type::DATETIME:
builder.claim_date_time(value.get<ignite_date_time>());
break;
case ignite_type::TIMESTAMP:
builder.claim_timestamp(value.get<ignite_timestamp>());
break;
case ignite_type::PERIOD:
builder.claim_period(value.get<ignite_period>());
break;
case ignite_type::DURATION:
builder.claim_duration(value.get<ignite_duration>());
break;
default:
throw ignite_error("Type with id " + std::to_string(int(typ)) + " is not yet supported");
}
}
/**
* Append column value to binary tuple.
*
* @param builder Binary tuple builder.
* @param typ Column type.
* @param value Value.
* @param scale Column scale.
*/
void append_column(binary_tuple_builder &builder, ignite_type typ, const primitive &value, std::int32_t scale) {
switch (typ) {
case ignite_type::BOOLEAN:
builder.append_bool(value.get<bool>());
break;
case ignite_type::INT8:
builder.append_int8(value.get<std::int8_t>());
break;
case ignite_type::INT16:
builder.append_int16(value.get<std::int16_t>());
break;
case ignite_type::INT32:
builder.append_int32(value.get<std::int32_t>());
break;
case ignite_type::INT64:
builder.append_int64(value.get<std::int64_t>());
break;
case ignite_type::FLOAT:
builder.append_float(value.get<float>());
break;
case ignite_type::DOUBLE:
builder.append_double(value.get<double>());
break;
case ignite_type::UUID:
builder.append_uuid(value.get<uuid>());
break;
case ignite_type::STRING:
builder.append_varlen(value.get<std::string>());
break;
case ignite_type::BYTE_ARRAY:
builder.append_varlen(value.get<std::vector<std::byte>>());
break;
case ignite_type::DECIMAL: {
big_decimal to_write;
value.get<big_decimal>().set_scale(scale, to_write);
builder.append_number(to_write);
break;
}
case ignite_type::DATE:
builder.append_date(value.get<ignite_date>());
break;
case ignite_type::TIME:
builder.append_time(value.get<ignite_time>());
break;
case ignite_type::DATETIME:
builder.append_date_time(value.get<ignite_date_time>());
break;
case ignite_type::TIMESTAMP:
builder.append_timestamp(value.get<ignite_timestamp>());
break;
case ignite_type::PERIOD:
builder.append_period(value.get<ignite_period>());
break;
case ignite_type::DURATION:
builder.append_duration(value.get<ignite_duration>());
break;
default:
throw ignite_error("Type with id " + std::to_string(int(typ)) + " is not yet supported");
}
}
/**
* Serialize tuple using table schema.
*
* @param sch Schema.
* @param tuple Tuple.
* @param key_only Should only key fields be serialized.
* @param no_value No value bitset.
* @return Serialized binary tuple.
*/
std::vector<std::byte> pack_tuple(
const schema &sch, const ignite_tuple &tuple, bool key_only, protocol::bitset_span &no_value) {
auto count = std::int32_t(key_only ? sch.key_columns.size() : sch.columns.size());
binary_tuple_builder builder{count};
builder.start();
auto col_indices = reinterpret_cast<std::int32_t *>(alloca(count * sizeof(std::int32_t)));
for (std::int32_t i = 0; i < count; ++i) {
const auto &col = sch.get_column(key_only, i);
auto col_idx = tuple.column_ordinal(col.name);
col_indices[i] = col_idx;
if (col_idx >= 0)
claim_column(builder, col.type, tuple.get(col_idx), col.scale);
else
builder.claim_null();
}
std::int32_t written = 0;
builder.layout();
for (std::int32_t i = 0; i < count; ++i) {
const auto &col = sch.get_column(key_only, i);
auto col_idx = col_indices[i];
if (col_idx >= 0) {
append_column(builder, col.type, tuple.get(col_idx), col.scale);
++written;
} else {
builder.append_null();
no_value.set(std::size_t(i));
}
}
if (!key_only && written < tuple.column_count()) {
std::vector<bool> written_ind(tuple.column_count(), false);
for (std::int32_t i = 0; i < count; ++i) {
auto col_idx = col_indices[i];
if (col_idx >= 0)
written_ind[col_idx] = true;
}
std::stringstream unmapped_columns;
for (std::int32_t i = 0; i < tuple.column_count(); ++i) {
if (written_ind[i])
continue;
auto &name = tuple.column_name(i);
unmapped_columns << name << ",";
}
auto unmapped_columns_str = unmapped_columns.str();
unmapped_columns_str.pop_back();
assert(!unmapped_columns_str.empty());
throw ignite_error("Tuple doesn't match schema: schemaVersion=" + std::to_string(sch.version)
+ ", extraColumns=" + unmapped_columns_str,
std::int32_t(error_flag::UNMAPPED_COLUMNS_PRESENT));
}
return builder.build();
}
ignite_tuple concat(const ignite_tuple &left, const ignite_tuple &right) {
// TODO: IGNITE-18855 eliminate unnecessary tuple transformation;
ignite_tuple res(left.column_count() + right.column_count());
res.m_pairs.assign(left.m_pairs.begin(), left.m_pairs.end());
res.m_indices.insert(left.m_indices.begin(), left.m_indices.end());
for (const auto &pair : right.m_pairs) {
bool inserted = res.m_indices.emplace(ignite_tuple::parse_name(pair.first), res.m_pairs.size()).second;
if (inserted) {
res.m_pairs.emplace_back(pair);
}
}
return res;
}
void write_tuple(protocol::writer &writer, const schema &sch, const ignite_tuple &tuple, bool key_only) {
const std::size_t count = key_only ? sch.key_columns.size() : sch.columns.size();
const std::size_t bytes_num = bytes_for_bits(count);
auto no_value_bytes = reinterpret_cast<std::byte *>(alloca(bytes_num));
memset(no_value_bytes, 0, bytes_num);
protocol::bitset_span no_value(no_value_bytes, bytes_num);
auto tuple_data = pack_tuple(sch, tuple, key_only, no_value);
writer.write_bitset(no_value.data());
writer.write_binary(tuple_data);
}
void write_tuples(protocol::writer &writer, const schema &sch, const std::vector<ignite_tuple> &tuples, bool key_only) {
writer.write(std::int32_t(tuples.size()));
for (auto &tuple : tuples)
write_tuple(writer, sch, tuple, key_only);
}
ignite_tuple read_tuple(protocol::reader &reader, const schema *sch, bool key_only) {
auto tuple_data = reader.read_binary();
auto columns_cnt = std::int32_t(key_only ? sch->key_columns.size() : sch->columns.size());
ignite_tuple res(columns_cnt);
binary_tuple_parser parser(columns_cnt, tuple_data);
for (std::int32_t i = 0; i < columns_cnt; ++i) {
auto &column = sch->get_column(key_only, i);
res.set(column.name, protocol::read_next_column(parser, column.type, column.scale));
}
return res;
}
std::optional<ignite_tuple> read_tuple_opt(protocol::reader &reader, const schema *sch) {
if (reader.try_read_nil())
return std::nullopt;
return read_tuple(reader, sch, false);
}
std::vector<ignite_tuple> read_tuples(protocol::reader &reader, const schema *sch, bool key_only) {
if (reader.try_read_nil())
return {};
auto count = reader.read_int32();
std::vector<ignite_tuple> res;
res.reserve(std::size_t(count));
for (std::int32_t i = 0; i < count; ++i)
res.emplace_back(read_tuple(reader, sch, key_only));
return res;
}
std::vector<std::optional<ignite_tuple>> read_tuples_opt(protocol::reader &reader, const schema *sch, bool key_only) {
if (reader.try_read_nil())
return {};
auto count = reader.read_int32();
std::vector<std::optional<ignite_tuple>> res;
res.reserve(std::size_t(count));
for (std::int32_t i = 0; i < count; ++i) {
auto exists = reader.read_bool();
if (!exists)
res.emplace_back(std::nullopt);
else
res.emplace_back(read_tuple(reader, sch, key_only));
}
return res;
}
cluster_node read_cluster_node(protocol::reader &reader) {
auto fields_count = reader.read_int32();
assert(fields_count >= 4);
auto id = reader.read_uuid();
auto name = reader.read_string();
auto host = reader.read_string();
auto port = reader.read_uint16();
reader.skip(fields_count - 4);
return {std::move(id), std::move(name), end_point{std::move(host), port}};
}
} // namespace ignite::detail