| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/rowset/segment_v2/binary_dict_page.h" |
| |
| #include <gen_cpp/olap_file.pb.h> |
| #include <gen_cpp/segment_v2.pb.h> |
| |
| #include <algorithm> |
| #include <ostream> |
| #include <utility> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "olap/rowset/segment_v2/binary_plain_page_v2.h" |
| #include "olap/rowset/segment_v2/bitshuffle_page.h" |
| #include "olap/rowset/segment_v2/encoding_info.h" |
| #include "util/coding.h" |
| #include "util/slice.h" // for Slice |
| #include "vec/columns/column.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| struct StringRef; |
| |
| namespace segment_v2 { |
| |
| BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) |
| : _options(options), |
| _finished(false), |
| _data_page_builder(nullptr), |
| _dict_builder(nullptr), |
| _encoding_type(DICT_ENCODING), |
| _dict_word_page_encoding_type( |
| options.encoding_preference.binary_plain_encoding_default_impl == |
| BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
| ? PLAIN_ENCODING_V2 |
| : PLAIN_ENCODING), |
| _fallback_binary_encoding_type( |
| options.encoding_preference.binary_plain_encoding_default_impl == |
| BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
| ? PLAIN_ENCODING_V2 |
| : PLAIN_ENCODING) {} |
| |
| Status BinaryDictPageBuilder::init() { |
| // initially use DICT_ENCODING |
| // TODO: the data page builder type can be created by Factory according to user config |
| PageBuilder* data_page_builder_ptr = nullptr; |
| RETURN_IF_ERROR(BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>::create( |
| &data_page_builder_ptr, _options)); |
| _data_page_builder.reset(data_page_builder_ptr); |
| PageBuilderOptions dict_builder_options; |
| // here the binary plain page is used to store the dictionary items so |
| // the data page size is set to the same as the dict page size |
| dict_builder_options.data_page_size = _options.dict_page_size; |
| dict_builder_options.dict_page_size = _options.dict_page_size; |
| dict_builder_options.is_dict_page = true; |
| |
| const EncodingInfo* encoding_info; |
| RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
| _dict_word_page_encoding_type, {}, &encoding_info)); |
| RETURN_IF_ERROR(encoding_info->create_page_builder(dict_builder_options, _dict_builder)); |
| return reset(); |
| } |
| |
| bool BinaryDictPageBuilder::is_page_full() { |
| if (_data_page_builder->is_page_full()) { |
| return true; |
| } |
| if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
| return true; |
| } |
| return false; |
| } |
| |
| Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { |
| if (_encoding_type == DICT_ENCODING) { |
| DCHECK(!_finished); |
| DCHECK_GT(*count, 0); |
| const Slice* src = reinterpret_cast<const Slice*>(vals); |
| size_t num_added = 0; |
| uint32_t value_code = -1; |
| auto* actual_builder = dynamic_cast<BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>*>( |
| _data_page_builder.get()); |
| |
| if (_data_page_builder->count() == 0) { |
| _first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()), |
| src->get_size()); |
| } |
| |
| for (int i = 0; i < *count; ++i, ++src) { |
| if (is_page_full()) { |
| break; |
| } |
| |
| if (src->empty() && _has_empty) { |
| value_code = _empty_code; |
| } else if (auto iter = _dictionary.find(*src); iter != _dictionary.end()) { |
| value_code = iter->second; |
| } else { |
| Slice dict_item(src->data, src->size); |
| if (src->size > 0) { |
| char* item_mem = _arena.alloc(src->size); |
| if (item_mem == nullptr) { |
| return Status::MemoryAllocFailed("memory allocate failed, size:{}", |
| src->size); |
| } |
| dict_item.relocate(item_mem); |
| } |
| value_code = cast_set<uint32_t>(_dictionary.size()); |
| size_t add_count = 1; |
| RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), |
| &add_count)); |
| if (add_count == 0) { |
| // current dict page is full, stop processing remaining inputs |
| break; |
| } |
| _dictionary.emplace(dict_item, value_code); |
| if (src->empty()) { |
| _has_empty = true; |
| _empty_code = value_code; |
| } |
| } |
| size_t add_count = 1; |
| RETURN_IF_ERROR(actual_builder->single_add( |
| reinterpret_cast<const uint8_t*>(&value_code), &add_count)); |
| if (add_count == 0) { |
| // current data page is full, stop processing remaining inputs |
| break; |
| } |
| // Track raw data size: the original string size |
| _raw_data_size += src->size; |
| num_added += 1; |
| } |
| *count = num_added; |
| return Status::OK(); |
| } else { |
| DCHECK(_encoding_type == PLAIN_ENCODING || _encoding_type == PLAIN_ENCODING_V2); |
| RETURN_IF_ERROR(_data_page_builder->add(vals, count)); |
| // For plain encoding, track raw data size from the input |
| const Slice* src = reinterpret_cast<const Slice*>(vals); |
| for (size_t i = 0; i < *count; ++i) { |
| _raw_data_size += src[i].size; |
| } |
| return Status::OK(); |
| } |
| } |
| |
| Status BinaryDictPageBuilder::finish(OwnedSlice* slice) { |
| if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) { |
| VLOG_DEBUG << "dict page size:" << _dict_builder->size(); |
| } |
| |
| DCHECK(!_finished); |
| _finished = true; |
| |
| OwnedSlice data_slice; |
| RETURN_IF_ERROR(_data_page_builder->finish(&data_slice)); |
| // TODO(gaodayue) separate page header and content to avoid this copy |
| RETURN_IF_CATCH_EXCEPTION( |
| { _buffer.append(data_slice.slice().data, data_slice.slice().size); }); |
| encode_fixed32_le(&_buffer[0], _encoding_type); |
| *slice = _buffer.build(); |
| return Status::OK(); |
| } |
| |
| Status BinaryDictPageBuilder::reset() { |
| RETURN_IF_CATCH_EXCEPTION({ |
| _finished = false; |
| _raw_data_size = 0; |
| _buffer.reserve(_options.data_page_size + BINARY_DICT_PAGE_HEADER_SIZE); |
| _buffer.resize(BINARY_DICT_PAGE_HEADER_SIZE); |
| |
| if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
| DCHECK(_fallback_binary_encoding_type == PLAIN_ENCODING || |
| _fallback_binary_encoding_type == PLAIN_ENCODING_V2); |
| const EncodingInfo* encoding_info; |
| RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
| _fallback_binary_encoding_type, {}, &encoding_info)); |
| RETURN_IF_ERROR(encoding_info->create_page_builder(_options, _data_page_builder)); |
| _encoding_type = _fallback_binary_encoding_type; |
| } else { |
| RETURN_IF_ERROR(_data_page_builder->reset()); |
| } |
| }); |
| return Status::OK(); |
| } |
| |
| size_t BinaryDictPageBuilder::count() const { |
| return _data_page_builder->count(); |
| } |
| |
| uint64_t BinaryDictPageBuilder::size() const { |
| return _arena.used_size() + _data_page_builder->size(); |
| } |
| |
| Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { |
| return _dict_builder->finish(dictionary_page); |
| } |
| |
| Status BinaryDictPageBuilder::get_dictionary_page_encoding(EncodingTypePB* encoding) const { |
| *encoding = _dict_word_page_encoding_type; |
| return Status::OK(); |
| } |
| |
| Status BinaryDictPageBuilder::get_first_value(void* value) const { |
| DCHECK(_finished); |
| if (_data_page_builder->count() == 0) { |
| return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
| } |
| if (_encoding_type != DICT_ENCODING) { |
| return _data_page_builder->get_first_value(value); |
| } |
| *reinterpret_cast<Slice*>(value) = Slice(_first_value); |
| return Status::OK(); |
| } |
| |
| Status BinaryDictPageBuilder::get_last_value(void* value) const { |
| DCHECK(_finished); |
| if (_data_page_builder->count() == 0) { |
| return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
| } |
| if (_encoding_type != DICT_ENCODING) { |
| return _data_page_builder->get_last_value(value); |
| } |
| uint32_t value_code; |
| RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code)); |
| RETURN_IF_ERROR(_dict_builder->get_dict_word(value_code, reinterpret_cast<Slice*>(value))); |
| return Status::OK(); |
| } |
| |
| uint64_t BinaryDictPageBuilder::get_raw_data_size() const { |
| return _raw_data_size; |
| } |
| |
| BinaryDictPageDecoder::BinaryDictPageDecoder(Slice data, const PageDecoderOptions& options) |
| : _data(data), |
| _options(options), |
| _data_page_decoder(nullptr), |
| _parsed(false), |
| _encoding_type(UNKNOWN_ENCODING) {} |
| |
| Status BinaryDictPageDecoder::init() { |
| CHECK(!_parsed); |
| if (_data.size < BINARY_DICT_PAGE_HEADER_SIZE) { |
| return Status::Corruption("invalid data size:{}, header size:{}", _data.size, |
| BINARY_DICT_PAGE_HEADER_SIZE); |
| } |
| size_t type = decode_fixed32_le((const uint8_t*)&_data.data[0]); |
| _encoding_type = static_cast<EncodingTypePB>(type); |
| _data.remove_prefix(BINARY_DICT_PAGE_HEADER_SIZE); |
| if (_encoding_type == DICT_ENCODING) { |
| _data_page_decoder.reset( |
| _bit_shuffle_ptr = |
| new BitShufflePageDecoder<FieldType::OLAP_FIELD_TYPE_INT>(_data, _options)); |
| } else if (_encoding_type == PLAIN_ENCODING) { |
| _data_page_decoder.reset( |
| new BinaryPlainPageDecoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
| } else if (_encoding_type == PLAIN_ENCODING_V2) { |
| _data_page_decoder.reset( |
| new BinaryPlainPageV2Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
| } else { |
| LOG(WARNING) << "invalid encoding type:" << _encoding_type; |
| return Status::Corruption("invalid encoding type:{}", _encoding_type); |
| } |
| |
| RETURN_IF_ERROR(_data_page_decoder->init()); |
| _parsed = true; |
| return Status::OK(); |
| } |
| |
| BinaryDictPageDecoder::~BinaryDictPageDecoder() {} |
| |
| Status BinaryDictPageDecoder::seek_to_position_in_page(size_t pos) { |
| return _data_page_decoder->seek_to_position_in_page(pos); |
| } |
| |
| bool BinaryDictPageDecoder::is_dict_encoding() const { |
| return _encoding_type == DICT_ENCODING; |
| } |
| |
| void BinaryDictPageDecoder::set_dict_decoder(uint32_t num_dict_items, StringRef* dict_word_info) { |
| _num_dict_items = num_dict_items; |
| _dict_word_info = dict_word_info; |
| }; |
| |
| Status BinaryDictPageDecoder::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { |
| if (!is_dict_encoding()) { |
| dst = dst->convert_to_predicate_column_if_dictionary(); |
| return _data_page_decoder->next_batch(n, dst); |
| } |
| // dictionary encoding |
| DCHECK(_parsed); |
| DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
| |
| if (*n == 0 || _bit_shuffle_ptr->_cur_index >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
| *n = 0; |
| return Status::OK(); |
| } |
| |
| size_t max_fetch = std::min(*n, static_cast<size_t>(_bit_shuffle_ptr->_num_elements - |
| _bit_shuffle_ptr->_cur_index)); |
| *n = max_fetch; |
| |
| const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
| size_t start_index = _bit_shuffle_ptr->_cur_index; |
| |
| dst->insert_many_dict_data(data_array, start_index, _dict_word_info, max_fetch, |
| _num_dict_items); |
| |
| _bit_shuffle_ptr->_cur_index += max_fetch; |
| |
| return Status::OK(); |
| } |
| |
| Status BinaryDictPageDecoder::read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, |
| size_t* n, vectorized::MutableColumnPtr& dst) { |
| if (!is_dict_encoding()) { |
| dst = dst->convert_to_predicate_column_if_dictionary(); |
| return _data_page_decoder->read_by_rowids(rowids, page_first_ordinal, n, dst); |
| } |
| DCHECK(_parsed); |
| DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
| |
| if (*n == 0) [[unlikely]] { |
| *n = 0; |
| return Status::OK(); |
| } |
| |
| const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
| auto total = *n; |
| size_t read_count = 0; |
| _buffer.resize(total); |
| for (size_t i = 0; i < total; ++i) { |
| ordinal_t ord = rowids[i] - page_first_ordinal; |
| if (ord >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
| break; |
| } |
| |
| _buffer[read_count++] = data_array[ord]; |
| } |
| |
| if (LIKELY(read_count > 0)) { |
| dst->insert_many_dict_data(_buffer.data(), 0, _dict_word_info, read_count, _num_dict_items); |
| } |
| *n = read_count; |
| return Status::OK(); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace segment_v2 |
| } // namespace doris |