blob: 9c57eefc96c785545ff2e2a11bc172f0d8019f34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <ignite/common/bytes_view.h>
#include <ignite/common/ignite_error.h>
#include <ignite/common/uuid.h>
#include <ignite/protocol/utils.h>
#include <msgpack.h>
#include <cstdint>
#include <functional>
namespace ignite::protocol {
/**
* Reader.
*/
class reader {
public:
// Deleted
reader() = delete;
reader(reader &&) = delete;
reader(const reader &) = delete;
reader &operator=(reader &&) = delete;
reader &operator=(const reader &) = delete;
/**
* Constructor.
*
* @param buffer Buffer.
*/
explicit reader(bytes_view buffer);
/**
* Destructor.
*/
~reader() { msgpack_unpacked_destroy(&m_current_val); }
/**
* Read object of type T from msgpack stream.
*
* @tparam T Type of the object to read.
* @return Object of type T.
* @throw ignite_error if there is no object of specified type in the stream.
*/
template<typename T>
[[nodiscard]] T read_object() {
check_data_in_stream();
auto res = unpack_object<T>(m_current_val.data);
next();
return res;
}
/**
* Read object of type T from msgpack stream.
*
* @tparam T Type of the object to read.
* @return Object of type T or @c nullopt if there is object of other type in the stream.
* @throw ignite_error if there is no data left in the stream.
*/
template<typename T>
[[nodiscard]] std::optional<T> try_read_object() {
check_data_in_stream();
auto res = try_unpack_object<T>(m_current_val.data);
if (res)
next();
return res;
}
/**
* Read object of type T from msgpack stream or nil.
*
* @tparam T Type of the object to read.
* @return Object of type T or std::nullopt if there is nil in the stream.
* @throw ignite_error if there is no object of specified type in the stream.
*/
template<typename T>
[[nodiscard]] std::optional<T> read_object_nullable() {
if (try_read_nil())
return std::nullopt;
return read_object<T>();
}
/**
* Read object of type T from msgpack stream or returns default value if the value in stream is nil.
*
* @tparam T Type of the object to read.
* @param on_nil Object to be returned on nil.
* @return Object of type T or @c on_nil if there is nil in stream.
* @throw ignite_error if there is no object of specified type in the stream.
*/
template<typename T>
[[nodiscard]] T read_object_or_default(T &&on_nil) {
if (try_read_nil())
return std::forward<T>(on_nil);
return read_object<T>();
}
/**
* Read int8.
*
* @return Value.
*/
[[nodiscard]] std::int8_t read_int8() { return read_object<std::int8_t>(); }
/**
* Read uint8.
*
* @return Value.
*/
[[nodiscard]] std::uint8_t read_uint8() { return read_object<std::uint8_t>(); }
/**
* Read int16.
*
* @return Value.
*/
[[nodiscard]] std::int16_t read_int16() { return read_object<std::int16_t>(); }
/**
* Read uint16.
*
* @return Value.
*/
[[nodiscard]] std::uint16_t read_uint16() { return read_object<std::uint16_t>(); }
/**
* Read int32.
*
* @return Value.
*/
[[nodiscard]] std::int32_t read_int32() { return read_object<std::int32_t>(); }
/**
* Read timestamp.
*
* @return Timestamp.
*/
[[nodiscard]] ignite_timestamp read_timestamp() {
auto seconds = read_int64();
auto nanos = read_int32();
return {seconds, nanos};
}
/**
* Read timestamp or null.
*
* @return Timestamp or std::nullopt.
*/
[[nodiscard]] std::optional<ignite_timestamp> read_timestamp_opt() {
if (try_read_nil())
return std::nullopt;
return {read_timestamp()};
}
/**
* Read array of int32.
*
* @return Value.
*/
[[nodiscard]] std::vector<std::int32_t> read_int32_array() {
auto length = read_int32();
std::vector<std::int32_t> values(length);
for (auto i = 0; i < length; i++) {
values[i] = read_int32();
}
return values;
}
/**
* Read array of int32.
*
* @return Value or nullopt.
*/
[[nodiscard]] std::optional<std::vector<std::int32_t>> read_int32_array_nullable() {
if (try_read_nil()) {
return std::nullopt;
}
return read_int32_array();
}
/**
* Read array of int64.
*
* @return Value or nullopt.
*/
[[nodiscard]] std::vector<std::int64_t> read_int64_array() {
auto length = read_int32();
std::vector<std::int64_t> values(length);
for (auto i = 0; i < length; i++) {
values[i] = read_int64();
}
return values;
}
/**
* Read array of int64.
*
* @return Value or nullopt.
*/
[[nodiscard]] std::optional<std::vector<std::int64_t>> read_int64_array_nullable() {
if (try_read_nil()) {
return std::nullopt;
}
return read_int64_array();
}
/**
* Read int32 or nullopt.
*
* @return Value or nullopt if the next value in stream is not integer.
*/
[[nodiscard]] std::optional<std::int32_t> try_read_int32() { return try_read_object<std::int32_t>(); }
/**
* Read int32 or nullopt.
*
* @return Value or nullopt if the next value in stream is nil.
*/
[[nodiscard]] std::optional<std::int32_t> read_int32_nullable() { return read_object_nullable<std::int32_t>(); }
/**
* Read uint8 or nullopt.
*
* @return Value or nullopt if the next value in stream is nil.
*/
[[nodiscard]] std::optional<std::uint8_t> read_uint8_nullable() { return read_object_nullable<std::uint8_t>(); }
/**
* Read int16 or nullopt.
*
* @return Value or nullopt if the next value in stream is nil.
*/
[[nodiscard]] std::optional<std::int16_t> read_int16_nullable() { return read_object_nullable<std::int16_t>(); }
/**
* Read int64 number.
*
* @return Value.
*/
[[nodiscard]] std::int64_t read_int64() { return read_object<int64_t>(); }
/**
* Read bool.
*
* @return Value.
*/
[[nodiscard]] bool read_bool() { return read_object<bool>(); }
/**
* Read string.
*
* @return String value.
*/
[[nodiscard]] std::string read_string() { return read_object<std::string>(); }
/**
* Read string.
*
* @return String value or nullopt.
*/
[[nodiscard]] std::optional<std::string> read_string_nullable() { return read_object_nullable<std::string>(); }
/**
* Read UUID.
*
* @return UUID value.
*/
[[nodiscard]] uuid read_uuid() { return read_object<uuid>(); }
/**
* Read array.
*
* @return Binary data view.
*/
[[nodiscard]] bytes_view read_binary() {
auto res = unpack_binary(m_current_val.data);
next();
return res;
}
/**
* If the next value is Nil, read it and move reader to the next position.
*
* @return @c true if the value was nil.
*/
bool try_read_nil();
/**
* Skip next value.
*/
void skip() { next(); }
/**
* Skip next value.
*/
void skip(int count) {
for (int i = 0; i < count; i++) {
skip();
}
}
/**
* Position.
*
* @return Current position in memory.
*/
[[nodiscard]] size_t position() const { return m_offset; }
private:
/**
* Move to the next value.
*/
void next();
/**
* Check whether there is a data in stream and throw ignite_error if there is none.
*/
void check_data_in_stream() const {
if (m_move_res < 0)
throw ignite_error("No more data in stream");
}
/** Buffer. */
bytes_view m_buffer;
/** Current value. */
msgpack_unpacked m_current_val;
/** Result of the last move operation. */
msgpack_unpack_return m_move_res;
/** Offset to next value. */
std::size_t m_offset_next{0};
/** Offset. */
std::size_t m_offset{0};
};
} // namespace ignite::protocol