blob: cb277446d6def2a453b06716dda9abf6990466fc [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <glog/logging.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <ostream>
#include <type_traits>
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "core/column/column.h"
#include "core/data_type/data_type.h"
#include "core/types.h"
#include "storage/olap_common.h"
#include "storage/segment/bitshuffle_wrapper.h"
#include "storage/segment/common.h"
#include "storage/segment/options.h"
#include "storage/segment/page_builder.h"
#include "storage/segment/page_decoder.h"
#include "storage/types.h"
#include "util/alignment.h"
#include "util/coding.h"
#include "util/faststring.h"
#include "util/slice.h"
namespace doris {
namespace segment_v2 {
enum { BITSHUFFLE_PAGE_HEADER_SIZE = 16 };
void warn_with_bitshuffle_error(int64_t val);
// BitshufflePageBuilder bitshuffles and compresses the bits of fixed
// size type blocks with lz4.
//
// The page format is as follows:
//
// 1. Header: (16 bytes total)
//
// <num_elements> [32-bit]
// The number of elements encoded in the page.
//
// <compressed_size> [32-bit]
// The post-compression size of the page, including this header.
//
// <padded_num_elements> [32-bit]
// Padding is needed to meet the requirements of the bitshuffle
// library such that the input/output is a multiple of 8. Some
// ignored elements are appended to the end of the page if necessary
// to meet this requirement.
//
// This header field is the post-padding element count.
//
// <elem_size_bytes> [32-bit]
// The size of the elements, in bytes, as actually encoded. In the
// case that all of the data in a page can fit into a smaller
// integer type, then we may choose to encode that smaller type
// to save CPU costs.
//
// This is currently only implemented in the UINT32 page type.
//
// NOTE: all on-disk ints are encoded little-endian
//
// 2. Element data
//
// The header is followed by the bitshuffle-compressed element data.
//
template <FieldType Type>
class BitshufflePageBuilder : public PageBuilderHelper<BitshufflePageBuilder<Type>> {
public:
using Self = BitshufflePageBuilder<Type>;
friend class PageBuilderHelper<Self>;
Status init() override { return reset(); }
bool is_page_full() override { return _remain_element_capacity == 0; }
Status add(const uint8_t* vals, size_t* count) override {
return add_internal<false>(vals, count);
}
Status single_add(const uint8_t* vals, size_t* count) {
return add_internal<true>(vals, count);
}
template <bool single>
inline Status add_internal(const uint8_t* vals, size_t* num_written) {
DCHECK(!_finished);
if (_remain_element_capacity == 0) {
*num_written = 0;
return Status::OK();
}
// When increasing the size of the memtabl flush threshold to a very large value, for example 15GB.
// the row count of a single men tbl could be very large.
// a real log:
/*
I20250823 19:01:16.153575 2982018 memtable_flush_executor.cpp:185] begin to flush memtable for tablet: 1755915952737, memsize: 15.11 GB, rows: 3751968
*/
// This is not a very wide table, actually it just has two columns, int and array<float>
// The write process of column array has two steps: write nested column(column float here), and write offsets column.
// The row count of column array is 3751968, which is not that big, but each row of column array has 768 float numbers (this is a common case in vector search scenario).
// so the row num of nested column float will be 3751968 * 768 = 2,881,511,424, which is bigger than INT32_MAX.
uint32_t to_add = cast_set<UInt32>(
std::min(cast_set<size_t>(_remain_element_capacity), *num_written));
// Max value of to_add_size is less than STORAGE_PAGE_SIZE_DEFAULT_VALUE
int to_add_size = to_add * SIZE_OF_TYPE;
size_t orig_size = _data.size();
// This may need a large memory, should return error if could not allocated
// successfully, to avoid BE OOM.
RETURN_IF_CATCH_EXCEPTION(_data.resize(orig_size + to_add_size));
_count += to_add;
_remain_element_capacity -= to_add;
_raw_data_size += to_add_size;
// return added number through count
*num_written = to_add;
if constexpr (single) {
if constexpr (SIZE_OF_TYPE == 1) {
*reinterpret_cast<uint8_t*>(&_data[orig_size]) = *vals;
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 2) {
*reinterpret_cast<uint16_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint16_t*>(vals);
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 4) {
*reinterpret_cast<uint32_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint32_t*>(vals);
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 8) {
*reinterpret_cast<uint64_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint64_t*>(vals);
return Status::OK();
}
}
// when single is true and SIZE_OF_TYPE > 8 or single is false
memcpy(&_data[orig_size], vals, to_add_size);
return Status::OK();
}
Status finish(OwnedSlice* slice) override {
RETURN_IF_CATCH_EXCEPTION({ *slice = _finish(SIZE_OF_TYPE); });
return Status::OK();
}
Status reset() override {
RETURN_IF_CATCH_EXCEPTION({
size_t block_size = _options.data_page_size;
_count = 0;
_raw_data_size = 0;
_data.clear();
_data.reserve(block_size);
DCHECK_EQ(reinterpret_cast<uintptr_t>(_data.data()) & (alignof(CppType) - 1), 0)
<< "buffer must be naturally-aligned";
_buffer.clear();
_buffer.resize(BITSHUFFLE_PAGE_HEADER_SIZE);
_finished = false;
_remain_element_capacity = cast_set<UInt32>(block_size / SIZE_OF_TYPE);
});
return Status::OK();
}
size_t count() const override { return _count; }
uint64_t size() const override { return _buffer.size(); }
uint64_t get_raw_data_size() const override { return _raw_data_size; }
private:
BitshufflePageBuilder(const PageBuilderOptions& options)
: _options(options), _count(0), _remain_element_capacity(0), _finished(false) {}
OwnedSlice _finish(int final_size_of_type) {
_data.resize(final_size_of_type * _count);
// Do padding so that the input num of element is multiple of 8.
int num_elems_after_padding = ALIGN_UP(_count, 8);
int padding_elems = num_elems_after_padding - _count;
int padding_bytes = padding_elems * final_size_of_type;
for (int i = 0; i < padding_bytes; i++) {
_data.push_back(0);
}
// reserve enough place for compression
_buffer.resize(
BITSHUFFLE_PAGE_HEADER_SIZE +
bitshuffle::compress_lz4_bound(num_elems_after_padding, final_size_of_type, 0));
int64_t bytes =
bitshuffle::compress_lz4(_data.data(), &_buffer[BITSHUFFLE_PAGE_HEADER_SIZE],
num_elems_after_padding, final_size_of_type, 0);
if (bytes < 0) [[unlikely]] {
// This means the bitshuffle function fails.
// Ideally, this should not happen.
warn_with_bitshuffle_error(bytes);
// It does not matter what will be returned here,
// since we have logged fatal in warn_with_bitshuffle_error().
return OwnedSlice();
}
// update header
encode_fixed32_le(&_buffer[0], _count);
encode_fixed32_le(&_buffer[4], cast_set<uint32_t>(BITSHUFFLE_PAGE_HEADER_SIZE + bytes));
encode_fixed32_le(&_buffer[8], num_elems_after_padding);
encode_fixed32_le(&_buffer[12], final_size_of_type);
_finished = true;
// before build(), update buffer length to the actual compressed size
_buffer.resize(BITSHUFFLE_PAGE_HEADER_SIZE + bytes);
return _buffer.build();
}
using CppType = typename TypeTraits<Type>::CppType;
CppType cell(int idx) const {
DCHECK_GE(idx, 0);
CppType ret;
memcpy(&ret, &_data[idx * SIZE_OF_TYPE], SIZE_OF_TYPE);
return ret;
}
enum { SIZE_OF_TYPE = TypeTraits<Type>::size };
PageBuilderOptions _options;
uint32_t _count;
uint32_t _remain_element_capacity;
bool _finished;
faststring _data;
faststring _buffer;
uint64_t _raw_data_size = 0;
};
inline Status parse_bit_shuffle_header(const Slice& data, size_t& num_elements,
size_t& compressed_size, size_t& num_element_after_padding,
int& size_of_element) {
if (data.size < BITSHUFFLE_PAGE_HEADER_SIZE) {
return Status::InternalError("file corruption: invalid data size:{}, header size:{}",
data.size, BITSHUFFLE_PAGE_HEADER_SIZE);
}
num_elements = decode_fixed32_le((const uint8_t*)&data[0]);
compressed_size = decode_fixed32_le((const uint8_t*)&data[4]);
num_element_after_padding = decode_fixed32_le((const uint8_t*)&data[8]);
size_of_element = decode_fixed32_le((const uint8_t*)&data[12]);
if (num_element_after_padding != ALIGN_UP(num_elements, 8)) {
return Status::InternalError(
"num of element information corrupted,"
" _num_element_after_padding:{}, _num_elements:{}, expected_padding:{},"
" compressed_size:{}, size_of_element:{}, data_size:{}",
num_element_after_padding, num_elements, ALIGN_UP(num_elements, 8), compressed_size,
size_of_element, data.size);
}
switch (size_of_element) {
case 1:
case 2:
case 3:
case 4:
case 8:
case 12:
case 16:
case 32:
break;
default:
return Status::InternalError("invalid size_of_elem:{}", size_of_element);
}
return Status::OK();
}
template <FieldType Type>
class BitShufflePageDecoder : public PageDecoder {
public:
BitShufflePageDecoder(Slice data, const PageDecoderOptions& options)
: _data(data),
_options(options),
_parsed(false),
_num_elements(0),
_num_element_after_padding(0),
_size_of_element(0),
_cur_index(0) {}
Status init() override {
CHECK(!_parsed);
size_t unused;
RETURN_IF_ERROR(parse_bit_shuffle_header(_data, _num_elements, unused,
_num_element_after_padding, _size_of_element));
if (_data.size !=
_num_element_after_padding * _size_of_element + BITSHUFFLE_PAGE_HEADER_SIZE) {
std::stringstream ss;
ss << "Size information unmatched, _data.size:" << _data.size
<< ", _num_elements:" << _num_elements << ", expected size is "
<< _num_element_after_padding * _size_of_element + BITSHUFFLE_PAGE_HEADER_SIZE;
return Status::InternalError(ss.str());
}
// Currently, only the UINT32 block encoder supports expanding size:
if (UNLIKELY(Type != FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT &&
_size_of_element != SIZE_OF_TYPE)) {
return Status::InternalError(
"invalid size info. size of element:{}, SIZE_OF_TYPE:{}, type:{}",
_size_of_element, SIZE_OF_TYPE, Type);
}
if (UNLIKELY(_size_of_element > SIZE_OF_TYPE)) {
return Status::InternalError("invalid size info. size of element:{}, SIZE_OF_TYPE:{}",
_size_of_element, SIZE_OF_TYPE);
}
_parsed = true;
return Status::OK();
}
// If the page only contains null, then _num_elements == 0, and the nullmap has
// some value. But in _seek_columns --> seek_to_ordinal --> _seek_to_pos_in_page
// in this call stack it will pass pos == 0 to this method. Although we can add some
// code such as check if the count == 0, then skip seek, but there are other method such
// as init will also call seek with pos == 0. And the seek is useless when _num_elements
// == 0, because next batch will return empty in this method.
Status seek_to_position_in_page(size_t pos) override {
DCHECK(_parsed) << "Must call init()";
if (_num_elements == 0) [[unlikely]] {
if (pos != 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"seek pos {} is larger than total elements {}", pos, _num_elements);
}
}
DCHECK_LE(pos, _num_elements);
_cur_index = pos;
return Status::OK();
}
Status seek_at_or_after_value(const void* value, bool* exact_match) override {
DCHECK(_parsed) << "Must call init() firstly";
if (_num_elements == 0) {
return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty");
}
size_t left = 0;
size_t right = _num_elements;
void* mid_value = nullptr;
// find the first value >= target. after loop,
// - left == index of first value >= target when found
// - left == _num_elements when not found (all values < target)
while (left < right) {
size_t mid = left + (right - left) / 2;
mid_value = get_data(mid);
if (TypeTraits<Type>::cmp(mid_value, value) < 0) {
left = mid + 1;
} else {
right = mid;
}
}
if (left >= _num_elements) {
return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("all value small than the value");
}
void* find_value = get_data(left);
if (TypeTraits<Type>::cmp(find_value, value) == 0) {
*exact_match = true;
} else {
*exact_match = false;
}
_cur_index = left;
return Status::OK();
}
template <bool forward_index = true>
Status next_batch(size_t* n, MutableColumnPtr& dst) {
DCHECK(_parsed);
if (*n == 0 || _cur_index >= _num_elements) [[unlikely]] {
*n = 0;
return Status::OK();
}
size_t max_fetch = std::min(*n, static_cast<size_t>(_num_elements - _cur_index));
dst->insert_many_fix_len_data(get_data(_cur_index), max_fetch);
*n = max_fetch;
if constexpr (forward_index) {
_cur_index += max_fetch;
}
return Status::OK();
}
Status next_batch(size_t* n, MutableColumnPtr& dst) override { return next_batch<>(n, dst); }
Status read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, size_t* n,
MutableColumnPtr& dst) override {
DCHECK(_parsed);
if (*n == 0) [[unlikely]] {
*n = 0;
return Status::OK();
}
auto total = *n;
auto read_count = 0;
_buffer.resize(total);
for (size_t i = 0; i < total; ++i) {
ordinal_t ord = rowids[i] - page_first_ordinal;
if (UNLIKELY(ord >= _num_elements)) {
break;
}
_buffer[read_count++] = *reinterpret_cast<CppType*>(get_data(ord));
}
if (LIKELY(read_count > 0)) {
dst->insert_many_fix_len_data((char*)_buffer.data(), read_count);
}
*n = read_count;
return Status::OK();
}
Status peek_next_batch(size_t* n, MutableColumnPtr& dst) override {
return next_batch<false>(n, dst);
}
size_t count() const override { return _num_elements; }
size_t current_index() const override { return _cur_index; }
char* get_data(size_t index) const {
return &_data.data[BITSHUFFLE_PAGE_HEADER_SIZE + index * SIZE_OF_TYPE];
}
private:
void _copy_next_values(size_t n, void* data) {
memcpy(data, get_data(_cur_index), n * SIZE_OF_TYPE);
}
using CppType = typename TypeTraits<Type>::CppType;
enum { SIZE_OF_TYPE = TypeTraits<Type>::size };
Slice _data;
PageDecoderOptions _options;
bool _parsed;
size_t _num_elements;
size_t _num_element_after_padding;
int _size_of_element;
size_t _cur_index;
std::vector<std::conditional_t<std::is_same_v<CppType, bool>, uint8_t, CppType>> _buffer;
friend class BinaryDictPageDecoder;
};
} // namespace segment_v2
} // namespace doris