| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "paimon/common/data/abstract_binary_writer.h" |
| |
| #include <cassert> |
| #include <memory> |
| #include <optional> |
| |
| #include "paimon/common/data/binary_array.h" |
| #include "paimon/common/data/binary_map.h" |
| #include "paimon/common/data/binary_row.h" |
| #include "paimon/common/data/binary_section.h" |
| #include "paimon/common/data/binary_string.h" |
| #include "paimon/common/memory/memory_segment_utils.h" |
| #include "paimon/data/decimal.h" |
| #include "paimon/data/timestamp.h" |
| #include "paimon/io/byte_order.h" |
| #include "paimon/memory/bytes.h" |
| #include "paimon/memory/memory_pool.h" |
| |
| namespace paimon { |
| |
| void AbstractBinaryWriter::WriteBytes(int32_t pos, const Bytes& bytes) { |
| int32_t len = bytes.size(); |
| if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { |
| WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len); |
| } else { |
| WriteBytesToVarLenPart(pos, bytes, len); |
| } |
| } |
| |
| void AbstractBinaryWriter::WriteString(int32_t pos, const BinaryString& input) { |
| int32_t len = input.GetSizeInBytes(); |
| if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { |
| auto bytes = Bytes::AllocateBytes(len, pool_); |
| MemorySegmentUtils::CopyToBytes({input.GetSegment()}, input.GetOffset(), bytes.get(), 0, |
| len); |
| WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), *bytes, len); |
| } else { |
| WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), len); |
| } |
| } |
| |
| void AbstractBinaryWriter::WriteBinary(int32_t pos, const Bytes& bytes) { |
| int32_t len = bytes.size(); |
| if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { |
| WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len); |
| } else { |
| WriteBytesToVarLenPart(pos, bytes, len); |
| } |
| } |
| |
| void AbstractBinaryWriter::WriteStringView(int32_t pos, const std::string_view& view) { |
| int32_t len = view.size(); |
| if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { |
| WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), view, len); |
| } else { |
| WriteBytesToVarLenPart(pos, view, len); |
| } |
| } |
| |
| void AbstractBinaryWriter::WriteRow(int32_t pos, const BinaryRow& input) { |
| return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), |
| input.GetSizeInBytes()); |
| } |
| |
| void AbstractBinaryWriter::WriteArray(int32_t pos, const BinaryArray& input) { |
| return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), |
| input.GetSizeInBytes()); |
| } |
| |
| void AbstractBinaryWriter::WriteMap(int32_t pos, const BinaryMap& input) { |
| return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), |
| input.GetSizeInBytes()); |
| } |
| void AbstractBinaryWriter::WriteDecimal(int32_t pos, const std::optional<Decimal>& value, |
| int32_t precision) { |
| assert(value == std::nullopt || precision == value.value().Precision()); |
| if (Decimal::IsCompact(precision)) { |
| assert(value != std::nullopt); |
| WriteLong(pos, value.value().ToUnscaledLong()); |
| } else { |
| // grow the global buffer before writing data. |
| EnsureCapacity(16); |
| // zero-out 16 bytes |
| segment_.PutValue<int64_t>(cursor_, 0ll); |
| segment_.PutValue<int64_t>(cursor_ + 8, 0ll); |
| |
| // Make sure Decimal object has the same scale as DecimalType. |
| // Note that we may pass in null Decimal object to set null for it. |
| if (value == std::nullopt) { |
| SetNullBit(pos); |
| SetOffsetAndSize(pos, cursor_, 0l); |
| } else { |
| auto bytes = value.value().ToUnscaledBytes(); |
| segment_.Put(cursor_, bytes, 0, bytes.size()); |
| SetOffsetAndSize(pos, cursor_, bytes.size()); |
| } |
| // move the cursor forward. |
| cursor_ += 16; |
| } |
| } |
| |
| void AbstractBinaryWriter::WriteTimestamp(int32_t pos, const std::optional<Timestamp>& value, |
| int32_t precision) { |
| if (Timestamp::IsCompact(precision)) { |
| assert(value != std::nullopt); |
| WriteLong(pos, value.value().GetMillisecond()); |
| } else { |
| // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond |
| EnsureCapacity(8); |
| |
| if (value == std::nullopt) { |
| SetNullBit(pos); |
| // zero-out the bytes |
| segment_.PutValue<int64_t>(cursor_, 0l); |
| SetOffsetAndSize(pos, cursor_, 0l); |
| } else { |
| segment_.PutValue<int64_t>(cursor_, value.value().GetMillisecond()); |
| SetOffsetAndSize(pos, cursor_, value.value().GetNanoOfMillisecond()); |
| } |
| cursor_ += 8; |
| } |
| } |
| |
| void AbstractBinaryWriter::ZeroOutPaddingBytes(int32_t num_bytes) { |
| if ((num_bytes & 0x07) > 0) { |
| segment_.PutValue<int64_t>(cursor_ + ((num_bytes >> 3) << 3), 0L); |
| } |
| } |
| |
| void AbstractBinaryWriter::EnsureCapacity(int32_t needed_size) { |
| const int32_t length = cursor_ + needed_size; |
| if (segment_.Size() < length) { |
| Grow(length); |
| } |
| } |
| |
| void AbstractBinaryWriter::WriteSegmentToVarLenPart(int32_t pos, const MemorySegment& segment, |
| int32_t offset, int32_t size) { |
| const int32_t rounded_size = RoundNumberOfBytesToNearestWord(size); |
| // grow the global buffer before writing data. |
| EnsureCapacity(rounded_size); |
| ZeroOutPaddingBytes(size); |
| |
| segment.CopyTo(offset, &segment_, cursor_, size); |
| SetOffsetAndSize(pos, cursor_, size); |
| // move the cursor forward. |
| cursor_ += rounded_size; |
| } |
| |
| template <typename T> |
| void AbstractBinaryWriter::WriteBytesToVarLenPart(int32_t pos, const T& bytes, int32_t len) { |
| const int32_t rounded_size = RoundNumberOfBytesToNearestWord(len); |
| // grow the global buffer before writing data. |
| EnsureCapacity(rounded_size); |
| ZeroOutPaddingBytes(len); |
| // Write the bytes to the variable length portion. |
| segment_.Put(cursor_, bytes, 0, len); |
| SetOffsetAndSize(pos, cursor_, len); |
| // move the cursor forward. |
| cursor_ += rounded_size; |
| } |
| |
| void AbstractBinaryWriter::Grow(int32_t min_capacity) { |
| int32_t old_capacity = segment_.Size(); |
| int32_t new_capacity = old_capacity + (old_capacity >> 1); |
| if (new_capacity - min_capacity < 0) { |
| new_capacity = min_capacity; |
| } |
| std::shared_ptr<Bytes> new_bytes = |
| Bytes::CopyOf(*(segment_.GetOrCreateHeapMemory(pool_)), new_capacity, pool_); |
| segment_ = MemorySegment::Wrap(new_bytes); |
| AfterGrow(); |
| } |
| |
| int32_t AbstractBinaryWriter::RoundNumberOfBytesToNearestWord(int32_t num_bytes) { |
| int32_t remainder = num_bytes & 0x07; |
| if (remainder == 0) { |
| return num_bytes; |
| } else { |
| return num_bytes + (8 - remainder); |
| } |
| } |
| |
| template <typename T> |
| void AbstractBinaryWriter::WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset, |
| const T& bytes, int32_t len) { |
| const uint64_t first_byte = |
| static_cast<uint64_t>(len) | 0x80U; // first bit is 1, low 7 bits are len |
| uint64_t seven_bytes = 0U; // real data |
| if ((SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN)) { |
| for (int32_t i = 0; i < len; i++) { |
| seven_bytes |= (static_cast<uint64_t>(bytes[i]) & 0xFFU) << (i * 8U); |
| } |
| } else { |
| for (int32_t i = 0; i < len; i++) { |
| seven_bytes |= (static_cast<uint64_t>(bytes[i]) & 0xFFU) << ((6 - i) * 8U); |
| } |
| } |
| const uint64_t offset_and_size = (first_byte << 56) | seven_bytes; |
| segment->PutValue<int64_t>(field_offset, static_cast<int64_t>(offset_and_size)); |
| } |
| |
| } // namespace paimon |