blob: 0bcd15501e248a16e7946196154896c974d9f1a5 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cstdint>
#include <cstring>
#include <memory>
#include <utility>
#include "fmt/format.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/common/utils/math.h"
#include "paimon/io/byte_order.h"
#include "paimon/io/data_input_stream.h"
#include "paimon/macros.h"
#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/type_fwd.h"
namespace paimon {
/// Utils for serialization.
class SerializationUtils {
public:
SerializationUtils() = delete;
~SerializationUtils() = delete;
/// Serialize `BinaryRow`, the difference between this and `BinaryRowSerializer` is
/// that arity is also serialized here, so the deserialization is schemaless.
static std::shared_ptr<Bytes> SerializeBinaryRow(const BinaryRow& row, MemoryPool* pool) {
int32_t size_in_row = row.GetSizeInBytes();
auto bytes = Bytes::AllocateBytes(size_in_row + sizeof(int32_t), pool);
int32_t arity = row.GetFieldCount();
if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
arity = EndianSwapValue(arity);
}
memcpy(bytes->data(), &arity, sizeof(int32_t));
MemorySegmentUtils::CopyToBytes(row.GetSegments(), row.GetOffset(), bytes.get(),
sizeof(int32_t), size_in_row);
return bytes;
}
static Status SerializeBinaryRow(const BinaryRow& row, MemorySegmentOutputStream* out) {
if (row.GetSizeInBytes() < 0) {
return Status::Invalid(
fmt::format("bytes size {} is less than 0", row.GetSizeInBytes()));
}
out->WriteValue<int32_t>(4 + row.GetSizeInBytes());
out->WriteValue<int32_t>(row.GetFieldCount());
return MemorySegmentUtils::CopyToStream(row.GetSegments(), row.GetOffset(),
row.GetSizeInBytes(), out);
}
/// Schemaless deserialization for `BinaryRow`.
static Result<BinaryRow> DeserializeBinaryRow(const std::shared_ptr<Bytes>& bytes) {
if (PAIMON_UNLIKELY(bytes->size() < 4)) {
return Status::Invalid(fmt::format("bytes size {} is less than 4", bytes->size()));
}
int32_t arity = *(reinterpret_cast<int32_t*>(bytes->data()));
if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
arity = EndianSwapValue(arity);
}
if (PAIMON_UNLIKELY(arity < 0)) {
return Status::Invalid("arity is less than 0");
}
BinaryRow row(arity);
row.PointTo(MemorySegment::Wrap(bytes), 4, bytes->size() - 4);
return row;
}
/// Schemaless deserialization for `BinaryRow` from a `DataInputStream`.
static Result<BinaryRow> DeserializeBinaryRow(DataInputStream* input, MemoryPool* pool) {
int32_t read_length = -1;
PAIMON_ASSIGN_OR_RAISE(read_length, input->ReadValue<int32_t>());
std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(read_length, pool);
PAIMON_RETURN_NOT_OK(input->ReadBytes(bytes.get()));
return DeserializeBinaryRow(bytes);
}
};
} // namespace paimon