blob: 184cd45d86a4db8d8d402eaf3e6539fd4692a4ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ignite/protocol/utils.h"
#include "ignite/protocol/reader.h"
#include <msgpack.h>
#include <limits>
#include <mutex>
#include <random>
#include <sstream>
#include <type_traits>
namespace ignite::protocol {
/**
* Check if int value fits in @c T.
*
* @tparam T Int type to fit value to.
* @param value Int value.
*/
template<typename T>
inline void check_int_fits(std::int64_t value) {
if (value > std::int64_t(std::numeric_limits<T>::max()))
throw ignite_error("The number in stream is too large to fit in type: " + std::to_string(value));
if (value < std::int64_t(std::numeric_limits<T>::min()))
throw ignite_error("The number in stream is too small to fit in type: " + std::to_string(value));
}
template<typename T>
std::optional<T> try_unpack_int(const msgpack_object &object) {
static_assert(
std::numeric_limits<T>::is_integer && std::numeric_limits<T>::is_signed, "Type T is not a signed integer type");
auto i64_val = try_unpack_object<std::int64_t>(object);
if (!i64_val)
return std::nullopt;
check_int_fits<T>(*i64_val);
return T(*i64_val);
}
template<>
std::optional<std::int64_t> try_unpack_object(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_NEGATIVE_INTEGER && object.type != MSGPACK_OBJECT_POSITIVE_INTEGER)
return std::nullopt;
return object.via.i64;
}
template<>
std::optional<std::int32_t> try_unpack_object(const msgpack_object &object) {
return try_unpack_int<std::int32_t>(object);
}
template<>
std::optional<std::string> try_unpack_object(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_STR)
return std::nullopt;
return std::string{object.via.str.ptr, object.via.str.size};
}
template<typename T>
T unpack_int(const msgpack_object &object) {
static_assert(
std::numeric_limits<T>::is_integer && std::numeric_limits<T>::is_signed, "Type T is not a signed integer type");
auto i64_val = unpack_object<std::int64_t>(object);
check_int_fits<T>(i64_val);
return T(i64_val);
}
template<>
std::optional<std::string> unpack_nullable(const msgpack_object &object) {
if (object.type == MSGPACK_OBJECT_NIL)
return std::nullopt;
return unpack_object<std::string>(object);
}
template<>
std::int64_t unpack_object(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_NEGATIVE_INTEGER && object.type != MSGPACK_OBJECT_POSITIVE_INTEGER)
throw ignite_error("The value in stream is not an integer number : " + std::to_string(object.type));
return object.via.i64;
}
template<>
std::int32_t unpack_object(const msgpack_object &object) {
return unpack_int<std::int32_t>(object);
}
template<>
std::int16_t unpack_object(const msgpack_object &object) {
return unpack_int<std::int16_t>(object);
}
template<>
std::int8_t unpack_object(const msgpack_object &object) {
return unpack_int<std::int8_t>(object);
}
template<>
std::string unpack_object(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_STR)
throw ignite_error("The value in stream is not a string : " + std::to_string(object.type));
return {object.via.str.ptr, object.via.str.size};
}
template<>
uuid unpack_object(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_EXT && object.via.ext.type != std::int8_t(extension_type::UUID))
throw ignite_error("The value in stream is not a UUID : " + std::to_string(object.type));
if (object.via.ext.size != 16)
throw ignite_error("Unexpected UUID size: " + std::to_string(object.via.ext.size));
auto data = reinterpret_cast<const std::byte *>(object.via.ext.ptr);
auto msb = bytes::load<endian::LITTLE, int64_t>(data);
auto lsb = bytes::load<endian::LITTLE, int64_t>(data + 8);
return {msb, lsb};
}
template<>
bool unpack_object(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_BOOLEAN)
throw ignite_error("The value in stream is not a bool : " + std::to_string(object.type));
return object.via.boolean;
}
std::uint32_t unpack_array_size(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_ARRAY)
throw ignite_error("The value in stream is not an Array : " + std::to_string(object.type));
return object.via.array.size;
}
void unpack_array_raw(const msgpack_object &object, const std::function<void(const msgpack_object &)> &read_func) {
auto size = unpack_array_size(object);
for (std::uint32_t i = 0; i < size; ++i)
read_func(object.via.array.ptr[i]);
}
bytes_view unpack_binary(const msgpack_object &object) {
if (object.type != MSGPACK_OBJECT_BIN)
throw ignite_error("The value in stream is not a Binary data : " + std::to_string(object.type));
return {reinterpret_cast<const std::byte *>(object.via.bin.ptr), object.via.bin.size};
}
uuid make_random_uuid() {
static std::mutex randomMutex;
static std::random_device rd;
static std::mt19937 gen(rd());
std::uniform_int_distribution<int64_t> distrib;
std::lock_guard<std::mutex> lock(randomMutex);
return {distrib(gen), distrib(gen)};
}
std::optional<ignite_error> read_error(reader &reader) {
if (reader.try_read_nil())
return std::nullopt;
auto trace_id = reader.try_read_nil() ? make_random_uuid() : reader.read_uuid();
auto code = reader.read_object_or_default<std::int32_t>(65537);
auto class_name = reader.read_string();
auto message = reader.read_string_nullable();
auto java_stack_trace = reader.read_string_nullable();
UNUSED_VALUE java_stack_trace;
std::stringstream err_msg_builder;
err_msg_builder << class_name;
if (message)
err_msg_builder << ": " << *message;
err_msg_builder << " (" << code << ", " << trace_id << ")";
return {ignite_error(status_code(code), err_msg_builder.str())};
}
void claim_primitive_with_type(binary_tuple_builder &builder, const primitive &value) {
if (value.is_null()) {
builder.claim_null(); // Type.
builder.claim_null(); // Scale.
builder.claim_null(); // Value.
return;
}
switch (value.get_type()) {
case ignite_type::BOOLEAN: {
claim_type_and_scale(builder, ignite_type::BOOLEAN);
builder.claim_bool(value.get<bool>());
break;
}
case ignite_type::INT8: {
claim_type_and_scale(builder, ignite_type::INT8);
builder.claim_int8(value.get<std::int8_t>());
break;
}
case ignite_type::INT16: {
claim_type_and_scale(builder, ignite_type::INT16);
builder.claim_int16(value.get<std::int16_t>());
break;
}
case ignite_type::INT32: {
claim_type_and_scale(builder, ignite_type::INT32);
builder.claim_int32(value.get<std::int32_t>());
break;
}
case ignite_type::INT64: {
claim_type_and_scale(builder, ignite_type::INT64);
builder.claim_int64(value.get<std::int64_t>());
break;
}
case ignite_type::FLOAT: {
claim_type_and_scale(builder, ignite_type::FLOAT);
builder.claim_float(value.get<float>());
break;
}
case ignite_type::DOUBLE: {
claim_type_and_scale(builder, ignite_type::DOUBLE);
builder.claim_double(value.get<double>());
break;
}
case ignite_type::UUID: {
claim_type_and_scale(builder, ignite_type::UUID);
builder.claim_uuid(value.get<uuid>());
break;
}
case ignite_type::STRING: {
claim_type_and_scale(builder, ignite_type::STRING);
builder.claim_varlen(value.get<std::string>());
break;
}
case ignite_type::BYTE_ARRAY: {
claim_type_and_scale(builder, ignite_type::BYTE_ARRAY);
auto &data = value.get<std::vector<std::byte>>();
builder.claim_varlen(data);
break;
}
case ignite_type::DECIMAL: {
const auto &dec_value = value.get<big_decimal>();
claim_type_and_scale(builder, ignite_type::DECIMAL, dec_value.get_scale());
builder.claim_number(dec_value);
break;
}
case ignite_type::NUMBER: {
claim_type_and_scale(builder, ignite_type::NUMBER);
builder.claim_number(value.get<big_integer>());
break;
}
case ignite_type::DATE: {
claim_type_and_scale(builder, ignite_type::DATE);
builder.claim_date(value.get<ignite_date>());
break;
}
case ignite_type::TIME: {
claim_type_and_scale(builder, ignite_type::TIME);
builder.claim_time(value.get<ignite_time>());
break;
}
case ignite_type::DATETIME: {
claim_type_and_scale(builder, ignite_type::DATETIME);
builder.claim_date_time(value.get<ignite_date_time>());
break;
}
case ignite_type::TIMESTAMP: {
claim_type_and_scale(builder, ignite_type::TIMESTAMP);
builder.claim_timestamp(value.get<ignite_timestamp>());
break;
}
case ignite_type::PERIOD: {
claim_type_and_scale(builder, ignite_type::PERIOD);
builder.claim_period(value.get<ignite_period>());
break;
}
case ignite_type::DURATION: {
claim_type_and_scale(builder, ignite_type::DURATION);
builder.claim_duration(value.get<ignite_duration>());
break;
}
case ignite_type::BITMASK: {
claim_type_and_scale(builder, ignite_type::BITMASK);
builder.claim_varlen(value.get<bit_array>().get_raw());
break;
}
default:
throw ignite_error("Unsupported type: " + std::to_string(int(value.get_type())));
}
}
void append_primitive_with_type(binary_tuple_builder &builder, const primitive &value) {
if (value.is_null()) {
builder.append_null(); // Type.
builder.append_null(); // Scale.
builder.append_null(); // Value.
return;
}
switch (value.get_type()) {
case ignite_type::BOOLEAN: {
append_type_and_scale(builder, ignite_type::BOOLEAN);
builder.append_bool(value.get<bool>());
break;
}
case ignite_type::INT8: {
append_type_and_scale(builder, ignite_type::INT8);
builder.append_int8(value.get<std::int8_t>());
break;
}
case ignite_type::INT16: {
append_type_and_scale(builder, ignite_type::INT16);
builder.append_int16(value.get<std::int16_t>());
break;
}
case ignite_type::INT32: {
append_type_and_scale(builder, ignite_type::INT32);
builder.append_int32(value.get<std::int32_t>());
break;
}
case ignite_type::INT64: {
append_type_and_scale(builder, ignite_type::INT64);
builder.append_int64(value.get<std::int64_t>());
break;
}
case ignite_type::FLOAT: {
append_type_and_scale(builder, ignite_type::FLOAT);
builder.append_float(value.get<float>());
break;
}
case ignite_type::DOUBLE: {
append_type_and_scale(builder, ignite_type::DOUBLE);
builder.append_double(value.get<double>());
break;
}
case ignite_type::UUID: {
append_type_and_scale(builder, ignite_type::UUID);
builder.append_uuid(value.get<uuid>());
break;
}
case ignite_type::STRING: {
append_type_and_scale(builder, ignite_type::STRING);
builder.append_varlen(value.get<std::string>());
break;
}
case ignite_type::BYTE_ARRAY: {
append_type_and_scale(builder, ignite_type::BYTE_ARRAY);
auto &data = value.get<std::vector<std::byte>>();
builder.append_varlen(data);
break;
}
case ignite_type::DECIMAL: {
const auto &dec_value = value.get<big_decimal>();
append_type_and_scale(builder, ignite_type::DECIMAL, dec_value.get_scale());
builder.append_number(dec_value);
break;
}
case ignite_type::NUMBER: {
append_type_and_scale(builder, ignite_type::NUMBER);
builder.append_number(value.get<big_integer>());
break;
}
case ignite_type::DATE: {
append_type_and_scale(builder, ignite_type::DATE);
builder.append_date(value.get<ignite_date>());
break;
}
case ignite_type::TIME: {
append_type_and_scale(builder, ignite_type::TIME);
builder.append_time(value.get<ignite_time>());
break;
}
case ignite_type::DATETIME: {
append_type_and_scale(builder, ignite_type::DATETIME);
builder.append_date_time(value.get<ignite_date_time>());
break;
}
case ignite_type::TIMESTAMP: {
append_type_and_scale(builder, ignite_type::TIMESTAMP);
builder.append_timestamp(value.get<ignite_timestamp>());
break;
}
case ignite_type::PERIOD: {
append_type_and_scale(builder, ignite_type::PERIOD);
builder.append_period(value.get<ignite_period>());
break;
}
case ignite_type::DURATION: {
append_type_and_scale(builder, ignite_type::DURATION);
builder.append_duration(value.get<ignite_duration>());
break;
}
case ignite_type::BITMASK: {
append_type_and_scale(builder, ignite_type::BITMASK);
builder.append_varlen(value.get<bit_array>().get_raw());
break;
}
default:
throw ignite_error("Unsupported type: " + std::to_string(int(value.get_type())));
}
}
primitive read_next_column(binary_tuple_parser &parser, ignite_type typ, std::int32_t scale) {
auto val = parser.get_next();
if (val.empty())
return {};
switch (typ) {
case ignite_type::BOOLEAN:
return binary_tuple_parser::get_bool(val);
case ignite_type::INT8:
return binary_tuple_parser::get_int8(val);
case ignite_type::INT16:
return binary_tuple_parser::get_int16(val);
case ignite_type::INT32:
return binary_tuple_parser::get_int32(val);
case ignite_type::INT64:
return binary_tuple_parser::get_int64(val);
case ignite_type::FLOAT:
return binary_tuple_parser::get_float(val);
case ignite_type::DOUBLE:
return binary_tuple_parser::get_double(val);
case ignite_type::UUID:
return binary_tuple_parser::get_uuid(val);
case ignite_type::STRING:
return std::string(binary_tuple_parser::get_varlen(val));
case ignite_type::BYTE_ARRAY:
return std::vector<std::byte>(binary_tuple_parser::get_varlen(val));
case ignite_type::DECIMAL:
return binary_tuple_parser::get_decimal(val, scale);
case ignite_type::NUMBER:
return binary_tuple_parser::get_number(val);
case ignite_type::DATE:
return binary_tuple_parser::get_date(val);
case ignite_type::TIME:
return binary_tuple_parser::get_time(val);
case ignite_type::DATETIME:
return binary_tuple_parser::get_date_time(val);
case ignite_type::TIMESTAMP:
return binary_tuple_parser::get_timestamp(val);
case ignite_type::PERIOD:
return binary_tuple_parser::get_period(val);
case ignite_type::DURATION:
return binary_tuple_parser::get_duration(val);
case ignite_type::BITMASK:
return bit_array(binary_tuple_parser::get_varlen(val));
default:
throw ignite_error("Type with id " + std::to_string(int(typ)) + " is not yet supported");
}
}
} // namespace ignite::protocol