blob: b8d99c8044dee4ea4cb45eb5459040e325ba6968 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/common/data/abstract_binary_writer.h"
#include <algorithm>
#include <cassert>
#include <memory>
#include <optional>
#include "paimon/common/data/binary_array.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/binary_section.h"
#include "paimon/common/data/binary_string.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/data/decimal.h"
#include "paimon/data/timestamp.h"
#include "paimon/io/byte_order.h"
#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"
namespace paimon {
void AbstractBinaryWriter::WriteBytes(int32_t pos, const Bytes& bytes) {
int32_t len = bytes.size();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len);
} else {
WriteBytesToVarLenPart(pos, bytes, len);
}
}
void AbstractBinaryWriter::WriteString(int32_t pos, const BinaryString& input) {
int32_t len = input.GetSizeInBytes();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
auto bytes = Bytes::AllocateBytes(len, pool_);
MemorySegmentUtils::CopyToBytes(input.GetSegments(), input.GetOffset(), bytes.get(), 0,
len);
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), *bytes, len);
} else {
WriteSegmentsToVarLenPart(pos, input.GetSegments(), input.GetOffset(), len);
}
}
void AbstractBinaryWriter::WriteBinary(int32_t pos, const Bytes& bytes) {
int32_t len = bytes.size();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len);
} else {
WriteBytesToVarLenPart(pos, bytes, len);
}
}
void AbstractBinaryWriter::WriteStringView(int32_t pos, const std::string_view& view) {
int32_t len = view.size();
if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) {
WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), view, len);
} else {
WriteBytesToVarLenPart(pos, view, len);
}
}
void AbstractBinaryWriter::WriteRow(int32_t pos, const BinaryRow& input) {
return WriteSegmentsToVarLenPart(pos, input.GetSegments(), input.GetOffset(),
input.GetSizeInBytes());
}
void AbstractBinaryWriter::WriteArray(int32_t pos, const BinaryArray& input) {
return WriteSegmentsToVarLenPart(pos, input.GetSegments(), input.GetOffset(),
input.GetSizeInBytes());
}
void AbstractBinaryWriter::WriteDecimal(int32_t pos, const std::optional<Decimal>& value,
int32_t precision) {
assert(value == std::nullopt || precision == value.value().Precision());
if (Decimal::IsCompact(precision)) {
assert(value != std::nullopt);
WriteLong(pos, value.value().ToUnscaledLong());
} else {
// grow the global buffer before writing data.
EnsureCapacity(16);
// zero-out 16 bytes
segment_.PutValue<int64_t>(cursor_, 0ll);
segment_.PutValue<int64_t>(cursor_ + 8, 0ll);
// Make sure Decimal object has the same scale as DecimalType.
// Note that we may pass in null Decimal object to set null for it.
if (value == std::nullopt) {
SetNullBit(pos);
SetOffsetAndSize(pos, cursor_, 0l);
} else {
auto bytes = value.value().ToUnscaledBytes();
segment_.Put(cursor_, bytes, 0, bytes.size());
SetOffsetAndSize(pos, cursor_, bytes.size());
}
// move the cursor forward.
cursor_ += 16;
}
}
void AbstractBinaryWriter::WriteTimestamp(int32_t pos, const std::optional<Timestamp>& value,
int32_t precision) {
if (Timestamp::IsCompact(precision)) {
assert(value != std::nullopt);
WriteLong(pos, value.value().GetMillisecond());
} else {
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
EnsureCapacity(8);
if (value == std::nullopt) {
SetNullBit(pos);
// zero-out the bytes
segment_.PutValue<int64_t>(cursor_, 0l);
SetOffsetAndSize(pos, cursor_, 0l);
} else {
segment_.PutValue<int64_t>(cursor_, value.value().GetMillisecond());
SetOffsetAndSize(pos, cursor_, value.value().GetNanoOfMillisecond());
}
cursor_ += 8;
}
}
void AbstractBinaryWriter::ZeroOutPaddingBytes(int32_t num_bytes) {
if ((num_bytes & 0x07) > 0) {
segment_.PutValue<int64_t>(cursor_ + ((num_bytes >> 3) << 3), 0L);
}
}
void AbstractBinaryWriter::EnsureCapacity(int32_t needed_size) {
const int32_t length = cursor_ + needed_size;
if (segment_.Size() < length) {
Grow(length);
}
}
void AbstractBinaryWriter::WriteSegmentsToVarLenPart(int32_t pos,
const std::vector<MemorySegment>& segments,
int32_t offset, int32_t size) {
const int32_t rounded_size = RoundNumberOfBytesToNearestWord(size);
// grow the global buffer before writing data.
EnsureCapacity(rounded_size);
ZeroOutPaddingBytes(size);
if (segments.size() == 1) {
segments[0].CopyTo(offset, &segment_, cursor_, size);
} else {
WriteMultiSegmentsToVarLenPart(segments, offset, size);
}
SetOffsetAndSize(pos, cursor_, size);
// move the cursor forward.
cursor_ += rounded_size;
}
void AbstractBinaryWriter::WriteMultiSegmentsToVarLenPart(
const std::vector<MemorySegment>& segments, int32_t offset, int32_t size) {
// Write the bytes to the variable length portion.
int32_t need_copy = size;
int32_t from_offset = offset;
int32_t to_offset = cursor_;
for (const auto& source_segment : segments) {
int32_t remain = source_segment.Size() - from_offset;
if (remain > 0) {
int32_t copy_size = std::min(remain, need_copy);
source_segment.CopyTo(from_offset, &segment_, to_offset, copy_size);
need_copy -= copy_size;
to_offset += copy_size;
from_offset = 0;
} else {
from_offset -= source_segment.Size();
}
}
}
template <typename T>
void AbstractBinaryWriter::WriteBytesToVarLenPart(int32_t pos, const T& bytes, int32_t len) {
const int32_t rounded_size = RoundNumberOfBytesToNearestWord(len);
// grow the global buffer before writing data.
EnsureCapacity(rounded_size);
ZeroOutPaddingBytes(len);
// Write the bytes to the variable length portion.
segment_.Put(cursor_, bytes, 0, len);
SetOffsetAndSize(pos, cursor_, len);
// move the cursor forward.
cursor_ += rounded_size;
}
void AbstractBinaryWriter::Grow(int32_t min_capacity) {
int32_t old_capacity = segment_.Size();
int32_t new_capacity = old_capacity + (old_capacity >> 1);
if (new_capacity - min_capacity < 0) {
new_capacity = min_capacity;
}
std::shared_ptr<Bytes> new_bytes = Bytes::CopyOf(*(segment_.GetArray()), new_capacity, pool_);
segment_ = MemorySegment::Wrap(new_bytes);
AfterGrow();
}
int32_t AbstractBinaryWriter::RoundNumberOfBytesToNearestWord(int32_t num_bytes) {
int32_t remainder = num_bytes & 0x07;
if (remainder == 0) {
return num_bytes;
} else {
return num_bytes + (8 - remainder);
}
}
template <typename T>
void AbstractBinaryWriter::WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset,
const T& bytes, int32_t len) {
int64_t first_byte = len | 0x80; // first bit is 1, other bits is len
int64_t seven_bytes = 0L; // real data
if ((SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN)) {
for (int32_t i = 0; i < len; i++) {
seven_bytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L));
}
} else {
for (int32_t i = 0; i < len; i++) {
seven_bytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) * 8L));
}
}
const int64_t offset_and_size =
(first_byte << 56) | // NOLINT(clang-analyzer-core.UndefinedBinaryOperatorResult)
seven_bytes;
segment->PutValue<int64_t>(field_offset, offset_and_size);
}
} // namespace paimon