blob: 4bf247b3da251cfa3da85477d7a73c741b329ac3 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "paimon/common/data/serializer/row_compacted_serializer.h"
#include "paimon/common/data/binary_array.h"
#include "paimon/common/data/binary_array_writer.h"
#include "paimon/common/data/binary_map.h"
#include "paimon/common/data/binary_row_writer.h"
#include "paimon/common/data/data_define.h"
#include "paimon/common/data/generic_row.h"
#include "paimon/common/data/serializer/binary_serializer_utils.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/common/utils/fields_comparator.h"
namespace paimon {
Result<std::unique_ptr<RowCompactedSerializer>> RowCompactedSerializer::Create(
const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<MemoryPool>& pool) {
std::vector<InternalRow::FieldGetterFunc> getters(schema->num_fields());
std::vector<RowCompactedSerializer::FieldWriter> writers(schema->num_fields());
std::vector<RowCompactedSerializer::FieldReader> readers(schema->num_fields());
for (int32_t i = 0; i < schema->num_fields(); i++) {
auto field_type = schema->field(i)->type();
PAIMON_ASSIGN_OR_RAISE(getters[i],
InternalRow::CreateFieldGetter(i, field_type, /*use_view=*/true));
PAIMON_ASSIGN_OR_RAISE(writers[i], CreateFieldWriter(field_type, pool));
PAIMON_ASSIGN_OR_RAISE(readers[i], CreateFieldReader(field_type, pool));
}
return std::unique_ptr<RowCompactedSerializer>(new RowCompactedSerializer(
schema, std::move(getters), std::move(writers), std::move(readers), pool));
}
Result<int32_t> RowCompactedSerializer::CompareField(const FieldInfo& field_info,
RowReader* reader1, RowReader* reader2) {
auto type = field_info.type_id;
switch (type) {
case arrow::Type::type::BOOL: {
auto val1 = reader1->ReadValue<bool>();
auto val2 = reader2->ReadValue<bool>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT8: {
auto val1 = reader1->ReadValue<char>();
auto val2 = reader2->ReadValue<char>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT16: {
auto val1 = reader1->ReadValue<int16_t>();
auto val2 = reader2->ReadValue<int16_t>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT32:
case arrow::Type::type::DATE32: {
auto val1 = reader1->ReadValue<int32_t>();
auto val2 = reader2->ReadValue<int32_t>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT64: {
auto val1 = reader1->ReadValue<int64_t>();
auto val2 = reader2->ReadValue<int64_t>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::FLOAT: {
auto val1 = reader1->ReadValue<float>();
auto val2 = reader2->ReadValue<float>();
return FieldsComparator::CompareFloatingPoint(val1, val2);
}
case arrow::Type::type::DOUBLE: {
auto val1 = reader1->ReadValue<double>();
auto val2 = reader2->ReadValue<double>();
return FieldsComparator::CompareFloatingPoint(val1, val2);
}
case arrow::Type::type::STRING:
case arrow::Type::type::BINARY: {
PAIMON_ASSIGN_OR_RAISE(std::string_view val1, reader1->ReadStringView());
PAIMON_ASSIGN_OR_RAISE(std::string_view val2, reader2->ReadStringView());
int32_t cmp = val1.compare(val2);
return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
}
case arrow::Type::type::TIMESTAMP: {
PAIMON_ASSIGN_OR_RAISE(Timestamp val1, reader1->ReadTimestamp(field_info.precision));
PAIMON_ASSIGN_OR_RAISE(Timestamp val2, reader2->ReadTimestamp(field_info.precision));
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::DECIMAL: {
PAIMON_ASSIGN_OR_RAISE(Decimal val1,
reader1->ReadDecimal(field_info.precision, field_info.scale));
PAIMON_ASSIGN_OR_RAISE(Decimal val2,
reader2->ReadDecimal(field_info.precision, field_info.scale));
int32_t cmp = val1.CompareTo(val2);
return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
}
default:
return Status::NotImplemented(
fmt::format("Do not support comparing type {} in CompareField",
static_cast<int32_t>(field_info.type_id)));
}
}
Result<MemorySlice::SliceComparator> RowCompactedSerializer::CreateSliceComparator(
const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<MemoryPool>& pool) {
int32_t bit_set_in_bytes = RowCompactedSerializer::CalculateBitSetInBytes(schema->num_fields());
auto row_reader1 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
auto row_reader2 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
std::vector<FieldInfo> field_infos(schema->num_fields());
for (int32_t i = 0; i < schema->num_fields(); i++) {
auto field_type = schema->field(i)->type();
field_infos[i].type_id = field_type->id();
if (field_type->id() == arrow::Type::type::TIMESTAMP) {
auto timestamp_type =
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
assert(timestamp_type);
field_infos[i].precision = DateTimeUtils::GetPrecisionFromType(timestamp_type);
} else if (field_type->id() == arrow::Type::type::DECIMAL) {
auto decimal_type =
arrow::internal::checked_pointer_cast<arrow::Decimal128Type>(field_type);
assert(decimal_type);
field_infos[i].precision = decimal_type->precision();
field_infos[i].scale = decimal_type->scale();
}
}
auto comparator = [row_reader1, row_reader2, field_infos](
const MemorySlice& slice1, const MemorySlice& slice2) -> Result<int32_t> {
row_reader1->PointTo(slice1.GetSegment(), slice1.Offset());
row_reader2->PointTo(slice2.GetSegment(), slice2.Offset());
for (int32_t i = 0; i < static_cast<int32_t>(field_infos.size()); i++) {
bool is_null1 = row_reader1->IsNullAt(i);
bool is_null2 = row_reader2->IsNullAt(i);
if (!is_null1 || !is_null2) {
if (is_null1) {
return -1;
} else if (is_null2) {
return 1;
} else {
PAIMON_ASSIGN_OR_RAISE(
int32_t comp,
CompareField(field_infos[i], row_reader1.get(), row_reader2.get()));
if (comp != 0) {
return comp;
}
}
}
}
return 0;
};
return std::function<Result<int32_t>(const MemorySlice&, const MemorySlice&)>(comparator);
}
Result<std::shared_ptr<Bytes>> RowCompactedSerializer::SerializeToBytes(const InternalRow& row) {
if (!row_writer_) {
row_writer_ = std::make_unique<RowWriter>(CalculateBitSetInBytes(getters_.size()), pool_);
}
row_writer_->Reset();
PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, row.GetRowKind());
row_writer_->WriteRowKind(*row_kind);
for (size_t i = 0; i < getters_.size(); i++) {
VariantType field = getters_[i](row);
PAIMON_RETURN_NOT_OK(writers_[i](i, field, row_writer_.get()));
}
return row_writer_->CopyBuffer();
}
Result<std::unique_ptr<InternalRow>> RowCompactedSerializer::Deserialize(
const std::shared_ptr<Bytes>& bytes) {
if (!row_reader_) {
row_reader_ = std::make_unique<RowReader>(CalculateBitSetInBytes(getters_.size()), pool_);
}
row_reader_->PointTo(bytes);
auto row = std::make_unique<GenericRow>(getters_.size());
PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, row_reader_->ReadRowKind());
row->SetRowKind(row_kind);
for (size_t i = 0; i < readers_.size(); i++) {
PAIMON_ASSIGN_OR_RAISE(VariantType field, readers_[i](i, row_reader_.get()));
row->SetField(i, field);
}
row->AddDataHolder(bytes);
return row;
}
RowCompactedSerializer::RowCompactedSerializer(
const std::shared_ptr<arrow::Schema>& schema,
std::vector<InternalRow::FieldGetterFunc>&& getters,
std::vector<RowCompactedSerializer::FieldWriter>&& writers,
std::vector<RowCompactedSerializer::FieldReader>&& readers,
const std::shared_ptr<MemoryPool>& pool)
: pool_(pool),
schema_(schema),
getters_(std::move(getters)),
writers_(std::move(writers)),
readers_(std::move(readers)) {}
Result<RowCompactedSerializer::FieldReader> RowCompactedSerializer::CreateFieldReader(
const std::shared_ptr<arrow::DataType>& field_type, const std::shared_ptr<MemoryPool>& pool) {
arrow::Type::type type = field_type->id();
RowCompactedSerializer::FieldReader field_reader;
switch (type) {
case arrow::Type::type::BOOL: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
return VariantType(reader->ReadValue<bool>());
};
break;
}
case arrow::Type::type::INT8: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
return VariantType(reader->ReadValue<char>());
};
break;
}
case arrow::Type::type::INT16: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
return VariantType(reader->ReadValue<int16_t>());
};
break;
}
case arrow::Type::type::INT32:
case arrow::Type::type::DATE32: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
return VariantType(reader->ReadValue<int32_t>());
};
break;
}
case arrow::Type::type::INT64: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
return VariantType(reader->ReadValue<int64_t>());
};
break;
}
case arrow::Type::type::FLOAT: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
return VariantType(reader->ReadValue<float>());
};
break;
}
case arrow::Type::type::DOUBLE: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
return VariantType(reader->ReadValue<double>());
};
break;
}
case arrow::Type::type::STRING: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadStringView());
return value;
};
break;
}
case arrow::Type::type::BINARY: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadBinary());
return value;
};
break;
}
case arrow::Type::type::TIMESTAMP: {
auto timestamp_type =
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
int32_t precision = DateTimeUtils::GetPrecisionFromType(timestamp_type);
field_reader = [precision](int32_t pos, RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadTimestamp(precision));
return value;
};
break;
}
case arrow::Type::type::DECIMAL: {
auto* decimal_type =
arrow::internal::checked_cast<arrow::Decimal128Type*>(field_type.get());
assert(decimal_type);
auto precision = decimal_type->precision();
auto scale = decimal_type->scale();
field_reader = [precision, scale](int32_t pos,
RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadDecimal(precision, scale));
return value;
};
break;
}
case arrow::Type::type::LIST: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadArray());
return value;
};
break;
}
case arrow::Type::type::MAP: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadMap());
return value;
};
break;
}
case arrow::Type::type::STRUCT: {
auto* struct_type = arrow::internal::checked_cast<arrow::StructType*>(field_type.get());
assert(struct_type);
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<RowCompactedSerializer> serializer,
RowCompactedSerializer::Create(arrow::schema(struct_type->fields()), pool));
field_reader = [serializer](int32_t pos, RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadRow(serializer));
return value;
};
break;
}
default:
return Status::Invalid(
fmt::format("type {} not support in FieldReader in row compacted serializer",
field_type->ToString()));
}
RowCompactedSerializer::FieldReader ret =
[field_reader](int32_t pos, RowReader* reader) -> Result<VariantType> {
if (reader->IsNullAt(pos)) {
return VariantType(NullType());
}
return field_reader(pos, reader);
};
return ret;
}
Result<RowCompactedSerializer::FieldWriter> RowCompactedSerializer::CreateFieldWriter(
const std::shared_ptr<arrow::DataType>& field_type, const std::shared_ptr<MemoryPool>& pool) {
arrow::Type::type type = field_type->id();
RowCompactedSerializer::FieldWriter field_writer;
switch (type) {
case arrow::Type::type::BOOL: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
return writer->WriteValue<bool>(DataDefine::GetVariantValue<bool>(field));
};
break;
}
case arrow::Type::type::INT8: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
return writer->WriteValue<char>(DataDefine::GetVariantValue<char>(field));
};
break;
}
case arrow::Type::type::INT16: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
return writer->WriteValue<int16_t>(DataDefine::GetVariantValue<int16_t>(field));
};
break;
}
case arrow::Type::type::INT32:
case arrow::Type::type::DATE32: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
return writer->WriteValue<int32_t>(DataDefine::GetVariantValue<int32_t>(field));
};
break;
}
case arrow::Type::type::INT64: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
return writer->WriteValue<int64_t>(DataDefine::GetVariantValue<int64_t>(field));
};
break;
}
case arrow::Type::type::FLOAT: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
return writer->WriteValue<float>(DataDefine::GetVariantValue<float>(field));
};
break;
}
case arrow::Type::type::DOUBLE: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
return writer->WriteValue<double>(DataDefine::GetVariantValue<double>(field));
};
break;
}
case arrow::Type::type::STRING: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
const auto* view = DataDefine::GetVariantPtr<std::string_view>(field);
if (view) {
return writer->WriteStringView(*view);
}
return writer->WriteString(DataDefine::GetVariantValue<BinaryString>(field));
};
break;
}
case arrow::Type::type::BINARY: {
field_writer = [](int32_t pos, const VariantType& field, RowWriter* writer) -> Status {
const auto* view = DataDefine::GetVariantPtr<std::string_view>(field);
if (view) {
return writer->WriteStringView(*view);
}
return writer->WriteBinary(
DataDefine::GetVariantValue<std::shared_ptr<Bytes>>(field).get());
};
break;
}
case arrow::Type::type::TIMESTAMP: {
auto timestamp_type =
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
int32_t precision = DateTimeUtils::GetPrecisionFromType(timestamp_type);
field_writer = [precision](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
return writer->WriteTimestamp(DataDefine::GetVariantValue<Timestamp>(field),
precision);
};
break;
}
case arrow::Type::type::DECIMAL: {
auto* decimal_type =
arrow::internal::checked_cast<arrow::Decimal128Type*>(field_type.get());
assert(decimal_type);
auto precision = decimal_type->precision();
field_writer = [precision](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
return writer->WriteDecimal(DataDefine::GetVariantValue<Decimal>(field), precision);
};
break;
}
case arrow::Type::type::LIST: {
field_writer = [field_type](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
return writer->WriteArray(
DataDefine::GetVariantValue<std::shared_ptr<InternalArray>>(field), field_type);
};
break;
}
case arrow::Type::type::MAP: {
field_writer = [field_type](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
return writer->WriteMap(
DataDefine::GetVariantValue<std::shared_ptr<InternalMap>>(field), field_type);
};
break;
}
case arrow::Type::type::STRUCT: {
auto struct_type = arrow::internal::checked_pointer_cast<arrow::StructType>(field_type);
assert(struct_type);
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<RowCompactedSerializer> serializer,
RowCompactedSerializer::Create(arrow::schema(struct_type->fields()), pool));
field_writer = [serializer](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
return writer->WriteRow(
DataDefine::GetVariantValue<std::shared_ptr<InternalRow>>(field), serializer);
};
break;
}
default:
return Status::Invalid(
fmt::format("type {} not support in FieldWriter in row compacted serializer",
field_type->ToString()));
}
RowCompactedSerializer::FieldWriter ret = [field_writer](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
if (DataDefine::IsVariantNull(field)) {
writer->SetNullAt(pos);
return Status::OK();
}
return field_writer(pos, field, writer);
};
return ret;
}
RowCompactedSerializer::RowWriter::RowWriter(int32_t header_size_in_bytes,
const std::shared_ptr<MemoryPool>& pool)
: header_size_in_bytes_(header_size_in_bytes), pool_(pool) {
int32_t initial_capacity = std::max(64, header_size_in_bytes);
SetBuffer(std::make_shared<Bytes>(initial_capacity, pool_.get()));
position_ = header_size_in_bytes_;
}
Status RowCompactedSerializer::RowWriter::WriteDecimal(const Decimal& value, int32_t precision) {
if (Decimal::IsCompact(precision)) {
return WriteValue<int64_t>(value.ToUnscaledLong());
} else {
auto value_bytes = value.ToUnscaledBytes();
return WriteBinary(&value_bytes);
}
}
Status RowCompactedSerializer::RowWriter::WriteTimestamp(const Timestamp& value,
int32_t precision) {
if (Timestamp::IsCompact(precision)) {
return WriteValue<int64_t>(value.GetMillisecond());
} else {
PAIMON_RETURN_NOT_OK(WriteValue<int64_t>(value.GetMillisecond()));
return WriteUnsignedInt(value.GetNanoOfMillisecond());
}
}
Status RowCompactedSerializer::RowWriter::WriteRow(
const std::shared_ptr<InternalRow>& value,
const std::shared_ptr<RowCompactedSerializer>& serializer) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, serializer->SerializeToBytes(*value));
return WriteBinary(bytes.get());
}
Status RowCompactedSerializer::RowWriter::WriteArray(const std::shared_ptr<InternalArray>& value,
const std::shared_ptr<arrow::DataType>& type) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryArray> binary_array,
BinarySerializerUtils::WriteBinaryArray(value, type, pool_.get()));
return WriteSegment(binary_array->GetSegment(), binary_array->GetOffset(),
binary_array->GetSizeInBytes());
}
Status RowCompactedSerializer::RowWriter::WriteMap(const std::shared_ptr<InternalMap>& value,
const std::shared_ptr<arrow::DataType>& type) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryMap> binary_map,
BinarySerializerUtils::WriteBinaryMap(value, type, pool_.get()));
return WriteSegment(binary_map->GetSegment(), binary_map->GetOffset(),
binary_map->GetSizeInBytes());
}
std::shared_ptr<Bytes> RowCompactedSerializer::RowWriter::CopyBuffer() const {
return Bytes::CopyOf(*buffer_, position_, pool_.get());
}
Status RowCompactedSerializer::RowWriter::WriteUnsignedInt(int32_t value) {
EnsureCapacity(VarLengthIntUtils::kMaxVarIntSize);
PAIMON_ASSIGN_OR_RAISE(int32_t len,
VarLengthIntUtils::EncodeInt(value, buffer_->data() + position_));
position_ += len;
return Status::OK();
}
Status RowCompactedSerializer::RowWriter::WriteSegment(const MemorySegment& segment, int32_t off,
int32_t len) {
PAIMON_RETURN_NOT_OK(WriteUnsignedInt(len));
EnsureCapacity(len);
MemorySegmentUtils::CopyToBytes({segment}, off, buffer_.get(), position_, len);
position_ += len;
return Status::OK();
}
void RowCompactedSerializer::RowWriter::EnsureCapacity(int32_t size) {
if (static_cast<int32_t>(buffer_->size()) - position_ < size) {
Grow(size);
}
}
void RowCompactedSerializer::RowWriter::Grow(int32_t min_capacity_add) {
auto current_len = static_cast<int32_t>(buffer_->size());
int32_t new_len = std::max(current_len * 2, current_len + min_capacity_add);
auto new_buffer = std::make_shared<Bytes>(new_len, pool_.get());
memcpy(new_buffer->data(), buffer_->data(), current_len);
SetBuffer(std::move(new_buffer));
}
void RowCompactedSerializer::RowWriter::SetBuffer(std::shared_ptr<Bytes> new_buffer) {
buffer_ = std::move(new_buffer);
segment_ = MemorySegment::Wrap(buffer_);
}
void RowCompactedSerializer::RowReader::PointTo(const std::shared_ptr<Bytes>& bytes) {
MemorySegment seg = MemorySegment::Wrap(bytes);
PointTo(seg, 0);
}
void RowCompactedSerializer::RowReader::PointTo(const MemorySegment& segment, int32_t offset) {
segment_ = segment;
offset_ = offset;
position_ = offset + header_size_in_bytes_;
}
Result<const RowKind*> RowCompactedSerializer::RowReader::ReadRowKind() const {
char b = segment_.Get(offset_);
return RowKind::FromByteValue(static_cast<int8_t>(b));
}
Result<std::string_view> RowCompactedSerializer::RowReader::ReadStringView() {
PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
std::string_view str(segment_.Data() + position_, length);
position_ += length;
return str;
}
Result<std::shared_ptr<Bytes>> RowCompactedSerializer::RowReader::ReadBinary() {
PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
auto bytes = std::make_shared<Bytes>(length, pool_.get());
segment_.Get(position_, bytes.get(), /*offset=*/0, length);
position_ += length;
return bytes;
}
Result<Decimal> RowCompactedSerializer::RowReader::ReadDecimal(int32_t precision, int32_t scale) {
if (Decimal::IsCompact(precision)) {
return Decimal::FromUnscaledLong(ReadValue<int64_t>(), precision, scale);
} else {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, ReadBinary());
return Decimal::FromUnscaledBytes(precision, scale, bytes.get());
}
}
Result<Timestamp> RowCompactedSerializer::RowReader::ReadTimestamp(int32_t precision) {
if (Timestamp::IsCompact(precision)) {
return Timestamp::FromEpochMillis(ReadValue<int64_t>());
}
auto milliseconds = ReadValue<int64_t>();
PAIMON_ASSIGN_OR_RAISE(int32_t nanos_of_millisecond, ReadUnsignedInt());
return Timestamp::FromEpochMillis(milliseconds, nanos_of_millisecond);
}
Result<std::shared_ptr<InternalArray>> RowCompactedSerializer::RowReader::ReadArray() {
auto value = std::make_shared<BinaryArray>();
PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
value->PointTo(segment_, position_, length);
position_ += length;
return value;
}
Result<std::shared_ptr<InternalMap>> RowCompactedSerializer::RowReader::ReadMap() {
auto value = std::make_shared<BinaryMap>();
PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
value->PointTo(segment_, position_, length);
position_ += length;
return value;
}
Result<std::shared_ptr<InternalRow>> RowCompactedSerializer::RowReader::ReadRow(
const std::shared_ptr<RowCompactedSerializer>& serializer) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, ReadBinary());
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InternalRow> result, serializer->Deserialize(bytes));
return result;
}
} // namespace paimon