| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/rowset/segment_v2/column_reader.h" |
| |
| #include <assert.h> |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/segment_v2.pb.h> |
| #include <glog/logging.h> |
| |
| #include <algorithm> |
| #include <memory> |
| #include <ostream> |
| #include <set> |
| #include <utility> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/status.h" |
| #include "io/fs/file_reader.h" |
| #include "olap/block_column_predicate.h" |
| #include "olap/column_predicate.h" |
| #include "olap/decimal12.h" |
| #include "olap/iterators.h" |
| #include "olap/olap_common.h" |
| #include "olap/rowset/segment_v2/ann_index/ann_index_reader.h" |
| #include "olap/rowset/segment_v2/binary_dict_page.h" // for BinaryDictPageDecoder |
| #include "olap/rowset/segment_v2/binary_plain_page.h" |
| #include "olap/rowset/segment_v2/bloom_filter.h" |
| #include "olap/rowset/segment_v2/bloom_filter_index_reader.h" |
| #include "olap/rowset/segment_v2/column_meta_accessor.h" |
| #include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo |
| #include "olap/rowset/segment_v2/index_file_reader.h" |
| #include "olap/rowset/segment_v2/index_reader.h" |
| #include "olap/rowset/segment_v2/inverted_index/analyzer/analyzer.h" |
| #include "olap/rowset/segment_v2/inverted_index_reader.h" |
| #include "olap/rowset/segment_v2/page_decoder.h" |
| #include "olap/rowset/segment_v2/page_handle.h" // for PageHandle |
| #include "olap/rowset/segment_v2/page_io.h" |
| #include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer |
| #include "olap/rowset/segment_v2/row_ranges.h" |
| #include "olap/rowset/segment_v2/segment.h" |
| #include "olap/rowset/segment_v2/variant/variant_column_reader.h" |
| #include "olap/rowset/segment_v2/zone_map_index.h" |
| #include "olap/tablet_schema.h" |
| #include "olap/types.h" // for TypeInfo |
| #include "olap/wrapper_field.h" |
| #include "runtime/decimalv2_value.h" |
| #include "runtime/define_primitive_type.h" |
| #include "util/binary_cast.hpp" |
| #include "util/bitmap.h" |
| #include "util/block_compression.h" |
| #include "util/rle_encoding.h" // for RleDecoder |
| #include "util/slice.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_array.h" |
| #include "vec/columns/column_map.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_struct.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/common/assert_cast.h" |
| #include "vec/common/schema_util.h" |
| #include "vec/common/string_ref.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type_agg_state.h" |
| #include "vec/data_types/data_type_factory.hpp" |
| #include "vec/data_types/data_type_nullable.h" |
| #include "vec/runtime/vdatetime_value.h" //for VecDateTime |
| |
| namespace doris::segment_v2 { |
| #include "column_reader.h" |
| #include "common/compile_check_begin.h" |
| |
| inline bool read_as_string(PrimitiveType type) { |
| return type == PrimitiveType::TYPE_STRING || type == PrimitiveType::INVALID_TYPE || |
| type == PrimitiveType::TYPE_BITMAP || type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT; |
| } |
| |
| Status ColumnReader::create_array(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, |
| const io::FileReaderSPtr& file_reader, |
| std::shared_ptr<ColumnReader>* reader) { |
| DCHECK(meta.children_columns_size() == 2 || meta.children_columns_size() == 3); |
| |
| std::shared_ptr<ColumnReader> item_reader; |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0), |
| meta.children_columns(0).num_rows(), file_reader, |
| &item_reader)); |
| |
| std::shared_ptr<ColumnReader> offset_reader; |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1), |
| meta.children_columns(1).num_rows(), file_reader, |
| &offset_reader)); |
| |
| std::shared_ptr<ColumnReader> null_reader; |
| if (meta.is_nullable()) { |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2), |
| meta.children_columns(2).num_rows(), file_reader, |
| &null_reader)); |
| } |
| |
| // The num rows of the array reader equals to the num rows of the length reader. |
| uint64_t array_num_rows = meta.children_columns(1).num_rows(); |
| std::shared_ptr<ColumnReader> array_reader( |
| new ColumnReader(opts, meta, array_num_rows, file_reader)); |
| // array reader do not need to init |
| array_reader->_sub_readers.resize(meta.children_columns_size()); |
| array_reader->_sub_readers[0] = std::move(item_reader); |
| array_reader->_sub_readers[1] = std::move(offset_reader); |
| if (meta.is_nullable()) { |
| array_reader->_sub_readers[2] = std::move(null_reader); |
| } |
| array_reader->_meta_type = FieldType::OLAP_FIELD_TYPE_ARRAY; |
| *reader = std::move(array_reader); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::create_map(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, |
| const io::FileReaderSPtr& file_reader, |
| std::shared_ptr<ColumnReader>* reader) { |
| // map reader now has 3 sub readers for key, value, offsets(scalar), null(scala) |
| DCHECK(meta.children_columns_size() == 3 || meta.children_columns_size() == 4); |
| std::shared_ptr<ColumnReader> key_reader; |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0), |
| meta.children_columns(0).num_rows(), file_reader, |
| &key_reader)); |
| std::shared_ptr<ColumnReader> val_reader; |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1), |
| meta.children_columns(1).num_rows(), file_reader, |
| &val_reader)); |
| std::shared_ptr<ColumnReader> offset_reader; |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2), |
| meta.children_columns(2).num_rows(), file_reader, |
| &offset_reader)); |
| std::shared_ptr<ColumnReader> null_reader; |
| if (meta.is_nullable()) { |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(3), |
| meta.children_columns(3).num_rows(), file_reader, |
| &null_reader)); |
| } |
| |
| // The num rows of the map reader equals to the num rows of the length reader. |
| uint64_t map_num_rows = meta.children_columns(2).num_rows(); |
| std::shared_ptr<ColumnReader> map_reader( |
| new ColumnReader(opts, meta, map_num_rows, file_reader)); |
| map_reader->_sub_readers.resize(meta.children_columns_size()); |
| |
| map_reader->_sub_readers[0] = std::move(key_reader); |
| map_reader->_sub_readers[1] = std::move(val_reader); |
| map_reader->_sub_readers[2] = std::move(offset_reader); |
| if (meta.is_nullable()) { |
| map_reader->_sub_readers[3] = std::move(null_reader); |
| } |
| map_reader->_meta_type = FieldType::OLAP_FIELD_TYPE_MAP; |
| *reader = std::move(map_reader); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::create_struct(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, |
| uint64_t num_rows, const io::FileReaderSPtr& file_reader, |
| std::shared_ptr<ColumnReader>* reader) { |
| // not support empty struct |
| DCHECK(meta.children_columns_size() >= 1); |
| // create struct column reader |
| std::shared_ptr<ColumnReader> struct_reader( |
| new ColumnReader(opts, meta, num_rows, file_reader)); |
| struct_reader->_sub_readers.reserve(meta.children_columns_size()); |
| // now we support struct column can add the children columns according to the schema-change behavior |
| for (int i = 0; i < meta.children_columns_size(); i++) { |
| std::shared_ptr<ColumnReader> sub_reader; |
| RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(i), |
| meta.children_columns(i).num_rows(), file_reader, |
| &sub_reader)); |
| struct_reader->_sub_readers.push_back(std::move(sub_reader)); |
| } |
| struct_reader->_meta_type = FieldType::OLAP_FIELD_TYPE_STRUCT; |
| *reader = std::move(struct_reader); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::create_agg_state(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, |
| uint64_t num_rows, const io::FileReaderSPtr& file_reader, |
| std::shared_ptr<ColumnReader>* reader) { |
| if (!meta.has_function_name()) { // meet old version ColumnMetaPB |
| std::shared_ptr<ColumnReader> reader_local( |
| new ColumnReader(opts, meta, num_rows, file_reader)); |
| RETURN_IF_ERROR(reader_local->init(&meta)); |
| *reader = std::move(reader_local); |
| return Status::OK(); |
| } |
| |
| auto data_type = vectorized::DataTypeFactory::instance().create_data_type(meta); |
| const auto* agg_state_type = assert_cast<const vectorized::DataTypeAggState*>(data_type.get()); |
| agg_state_type->check_function_compatibility(opts.be_exec_version); |
| auto type = agg_state_type->get_serialized_type()->get_primitive_type(); |
| |
| if (read_as_string(type)) { |
| std::shared_ptr<ColumnReader> reader_local( |
| new ColumnReader(opts, meta, num_rows, file_reader)); |
| RETURN_IF_ERROR(reader_local->init(&meta)); |
| *reader = std::move(reader_local); |
| return Status::OK(); |
| } else if (type == PrimitiveType::TYPE_MAP) { |
| return create_map(opts, meta, file_reader, reader); |
| } else if (type == PrimitiveType::TYPE_ARRAY) { |
| return create_array(opts, meta, file_reader, reader); |
| } else if (type == PrimitiveType::TYPE_STRUCT) { |
| return create_struct(opts, meta, num_rows, file_reader, reader); |
| } |
| |
| return Status::InternalError("Not supported type: {}, serialized type: {}", |
| agg_state_type->get_name(), int(type)); |
| } |
| |
| bool ColumnReader::is_compaction_reader_type(ReaderType type) { |
| return type == ReaderType::READER_BASE_COMPACTION || |
| type == ReaderType::READER_CUMULATIVE_COMPACTION || |
| type == ReaderType::READER_COLD_DATA_COMPACTION || |
| type == ReaderType::READER_SEGMENT_COMPACTION || |
| type == ReaderType::READER_FULL_COMPACTION; |
| } |
| |
| Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, |
| uint64_t num_rows, const io::FileReaderSPtr& file_reader, |
| std::shared_ptr<ColumnReader>* reader) { |
| if (is_scalar_type((FieldType)meta.type())) { |
| std::shared_ptr<ColumnReader> reader_local( |
| new ColumnReader(opts, meta, num_rows, file_reader)); |
| RETURN_IF_ERROR(reader_local->init(&meta)); |
| *reader = std::move(reader_local); |
| return Status::OK(); |
| } else { |
| auto type = (FieldType)meta.type(); |
| switch (type) { |
| case FieldType::OLAP_FIELD_TYPE_AGG_STATE: { |
| return create_agg_state(opts, meta, num_rows, file_reader, reader); |
| } |
| case FieldType::OLAP_FIELD_TYPE_STRUCT: { |
| return create_struct(opts, meta, num_rows, file_reader, reader); |
| } |
| case FieldType::OLAP_FIELD_TYPE_ARRAY: { |
| return create_array(opts, meta, file_reader, reader); |
| } |
| case FieldType::OLAP_FIELD_TYPE_MAP: { |
| return create_map(opts, meta, file_reader, reader); |
| } |
| case FieldType::OLAP_FIELD_TYPE_VARIANT: { |
| // Read variant only root data using a single ColumnReader |
| std::shared_ptr<ColumnReader> reader_local( |
| new ColumnReader(opts, meta, num_rows, file_reader)); |
| RETURN_IF_ERROR(reader_local->init(&meta)); |
| *reader = std::move(reader_local); |
| return Status::OK(); |
| } |
| default: |
| return Status::NotSupported("unsupported type for ColumnReader: {}", |
| std::to_string(int(type))); |
| } |
| } |
| } |
| |
| ColumnReader::ColumnReader() = default; |
| |
| ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, |
| uint64_t num_rows, io::FileReaderSPtr file_reader) |
| : _use_index_page_cache(!config::disable_storage_page_cache), |
| _opts(opts), |
| _num_rows(num_rows), |
| _file_reader(std::move(file_reader)), |
| _dict_encoding_type(UNKNOWN_DICT_ENCODING) { |
| _meta_length = meta.length(); |
| _meta_type = (FieldType)meta.type(); |
| if (_meta_type == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
| _meta_children_column_type = (FieldType)meta.children_columns(0).type(); |
| } |
| _data_type = vectorized::DataTypeFactory::instance().create_data_type(meta); |
| _meta_is_nullable = meta.is_nullable(); |
| _meta_dict_page = meta.dict_page(); |
| _meta_compression = meta.compression(); |
| } |
| |
| ColumnReader::~ColumnReader() = default; |
| |
| int64_t ColumnReader::get_metadata_size() const { |
| return sizeof(ColumnReader) + (_segment_zone_map ? _segment_zone_map->ByteSizeLong() : 0); |
| } |
| |
| #ifdef BE_TEST |
| /// This function is only used in UT to verify the correctness of data read from zone map |
| /// See UT case 'SegCompactionMoWTest.SegCompactionInterleaveWithBig_ooooOOoOooooooooO' |
| /// be/test/olap/segcompaction_mow_test.cpp |
| void ColumnReader::check_data_by_zone_map_for_test(const vectorized::MutableColumnPtr& dst) const { |
| if (!_segment_zone_map) { |
| return; |
| } |
| |
| const auto rows = dst->size(); |
| if (rows == 0) { |
| return; |
| } |
| |
| FieldType type = _type_info->type(); |
| |
| if (type != FieldType::OLAP_FIELD_TYPE_INT) { |
| return; |
| } |
| |
| auto* non_nullable_column = dst->is_nullable() |
| ? assert_cast<vectorized::ColumnNullable*>(dst.get()) |
| ->get_nested_column_ptr() |
| .get() |
| : dst.get(); |
| |
| /// `PredicateColumnType<TYPE_INT>` does not support `void get(size_t n, Field& res)`, |
| /// So here only check `CoumnVector<TYPE_INT>` |
| if (vectorized::check_and_get_column<vectorized::ColumnVector<TYPE_INT>>(non_nullable_column) == |
| nullptr) { |
| return; |
| } |
| |
| std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta_length)); |
| std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, _meta_length)); |
| THROW_IF_ERROR(_parse_zone_map(*_segment_zone_map, min_value.get(), max_value.get())); |
| |
| if (min_value->is_null() || max_value->is_null()) { |
| return; |
| } |
| |
| int32_t min_v = *reinterpret_cast<int32_t*>(min_value->cell_ptr()); |
| int32_t max_v = *reinterpret_cast<int32_t*>(max_value->cell_ptr()); |
| |
| for (size_t i = 0; i != rows; ++i) { |
| vectorized::Field field; |
| dst->get(i, field); |
| DCHECK(!field.is_null()); |
| const auto v = field.get<int32_t>(); |
| DCHECK_GE(v, min_v); |
| DCHECK_LE(v, max_v); |
| } |
| } |
| #endif |
| |
| Status ColumnReader::init(const ColumnMetaPB* meta) { |
| _type_info = get_type_info(meta); |
| |
| if (meta->has_be_exec_version()) { |
| _be_exec_version = meta->be_exec_version(); |
| } |
| |
| if (_type_info == nullptr) { |
| return Status::NotSupported("unsupported typeinfo, type={}", meta->type()); |
| } |
| RETURN_IF_ERROR(EncodingInfo::get(_type_info->type(), meta->encoding(), {}, &_encoding_info)); |
| |
| for (int i = 0; i < meta->indexes_size(); i++) { |
| const auto& index_meta = meta->indexes(i); |
| switch (index_meta.type()) { |
| case BITMAP_INDEX: |
| break; |
| case ORDINAL_INDEX: |
| _ordinal_index.reset( |
| new OrdinalIndexReader(_file_reader, _num_rows, index_meta.ordinal_index())); |
| break; |
| case ZONE_MAP_INDEX: |
| _segment_zone_map = |
| std::make_unique<ZoneMapPB>(index_meta.zone_map_index().segment_zone_map()); |
| _zone_map_index.reset(new ZoneMapIndexReader( |
| _file_reader, index_meta.zone_map_index().page_zone_maps())); |
| break; |
| case BLOOM_FILTER_INDEX: |
| _bloom_filter_index.reset( |
| new BloomFilterIndexReader(_file_reader, index_meta.bloom_filter_index())); |
| break; |
| default: |
| return Status::Corruption("Bad file {}: invalid column index type {}", |
| _file_reader->path().native(), index_meta.type()); |
| } |
| } |
| update_metadata_size(); |
| |
| // ArrayColumnWriter writes a single empty array and flushes. In this scenario, |
| // the item writer doesn't write any data and the corresponding ordinal index is empty. |
| if (_ordinal_index == nullptr && !is_empty()) { |
| return Status::Corruption("Bad file {}: missing ordinal index for column {}", |
| _file_reader->path().native(), meta->column_id()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::new_index_iterator(const std::shared_ptr<IndexFileReader>& index_file_reader, |
| const TabletIndex* index_meta, |
| std::unique_ptr<IndexIterator>* iterator) { |
| RETURN_IF_ERROR(_load_index(index_file_reader, index_meta)); |
| { |
| std::shared_lock<std::shared_mutex> rlock(_load_index_lock); |
| auto iter = _index_readers.find(index_meta->index_id()); |
| if (iter != _index_readers.end()) { |
| if (iter->second != nullptr) { |
| RETURN_IF_ERROR(iter->second->new_iterator(iterator)); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, |
| PageHandle* handle, Slice* page_body, PageFooterPB* footer, |
| BlockCompressionCodec* codec, bool is_dict_page) const { |
| iter_opts.sanity_check(); |
| PageReadOptions opts(iter_opts.io_ctx); |
| opts.verify_checksum = _opts.verify_checksum; |
| opts.use_page_cache = iter_opts.use_page_cache; |
| opts.kept_in_memory = _opts.kept_in_memory; |
| opts.type = iter_opts.type; |
| opts.file_reader = iter_opts.file_reader; |
| opts.page_pointer = pp; |
| opts.codec = codec; |
| opts.stats = iter_opts.stats; |
| opts.encoding_info = _encoding_info; |
| opts.is_dict_page = is_dict_page; |
| |
| return PageIO::read_and_decompress_page(opts, handle, page_body, footer); |
| } |
| |
| Status ColumnReader::get_row_ranges_by_zone_map( |
| const AndBlockColumnPredicate* col_predicates, |
| const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates, |
| RowRanges* row_ranges, const ColumnIteratorOptions& iter_opts) { |
| std::vector<uint32_t> page_indexes; |
| RETURN_IF_ERROR( |
| _get_filtered_pages(col_predicates, delete_predicates, &page_indexes, iter_opts)); |
| RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, row_ranges, iter_opts)); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) const { |
| if (_segment_zone_map == nullptr) { |
| return Status::InternalError("segment zonemap not exist"); |
| } |
| // TODO: this work to get min/max value seems should only do once |
| FieldType type = _type_info->type(); |
| std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta_length)); |
| std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, _meta_length)); |
| RETURN_IF_ERROR( |
| _parse_zone_map_skip_null(*_segment_zone_map, min_value.get(), max_value.get())); |
| |
| dst->reserve(*n); |
| bool is_string = is_olap_string_type(type); |
| if (max_value->is_null()) { |
| assert_cast<vectorized::ColumnNullable&>(*dst).insert_default(); |
| } else { |
| if (is_string) { |
| auto sv = (StringRef*)max_value->cell_ptr(); |
| dst->insert_data(sv->data, sv->size); |
| } else { |
| dst->insert_many_fix_len_data(static_cast<const char*>(max_value->cell_ptr()), 1); |
| } |
| } |
| |
| auto size = *n - 1; |
| if (min_value->is_null()) { |
| assert_cast<vectorized::ColumnNullable&>(*dst).insert_many_defaults(size); |
| } else { |
| if (is_string) { |
| auto sv = (StringRef*)min_value->cell_ptr(); |
| dst->insert_data_repeatedly(sv->data, sv->size, size); |
| } else { |
| // TODO: the work may cause performance problem, opt latter |
| for (int i = 0; i < size; ++i) { |
| dst->insert_many_fix_len_data(static_cast<const char*>(min_value->cell_ptr()), 1); |
| } |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::match_condition(const AndBlockColumnPredicate* col_predicates, |
| bool* matched) const { |
| *matched = true; |
| if (_zone_map_index == nullptr) { |
| return Status::OK(); |
| } |
| FieldType type = _type_info->type(); |
| std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta_length)); |
| std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, _meta_length)); |
| RETURN_IF_ERROR(_parse_zone_map(*_segment_zone_map, min_value.get(), max_value.get())); |
| |
| *matched = _zone_map_match_condition(*_segment_zone_map, min_value.get(), max_value.get(), |
| col_predicates); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::prune_predicates_by_zone_map( |
| std::vector<std::shared_ptr<ColumnPredicate>>& predicates, const int column_id, |
| bool* pruned) const { |
| *pruned = false; |
| if (_zone_map_index == nullptr) { |
| return Status::OK(); |
| } |
| |
| FieldType type = _type_info->type(); |
| std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta_length)); |
| std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, _meta_length)); |
| RETURN_IF_ERROR(_parse_zone_map(*_segment_zone_map, min_value.get(), max_value.get())); |
| |
| for (auto it = predicates.begin(); it != predicates.end();) { |
| auto predicate = *it; |
| if (predicate->column_id() == column_id && |
| predicate->is_always_true({min_value.get(), max_value.get()})) { |
| *pruned = true; |
| it = predicates.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::_parse_zone_map(const ZoneMapPB& zone_map, WrapperField* min_value_container, |
| WrapperField* max_value_container) const { |
| // min value and max value are valid if has_not_null is true |
| if (zone_map.has_not_null()) { |
| min_value_container->set_not_null(); |
| max_value_container->set_not_null(); |
| |
| if (zone_map.has_negative_inf()) { |
| if (FieldType::OLAP_FIELD_TYPE_FLOAT == _meta_type) { |
| static auto constexpr float_neg_inf = -std::numeric_limits<float>::infinity(); |
| min_value_container->set_raw_value(&float_neg_inf, sizeof(float_neg_inf)); |
| } else if (FieldType::OLAP_FIELD_TYPE_DOUBLE == _meta_type) { |
| static auto constexpr double_neg_inf = -std::numeric_limits<double>::infinity(); |
| min_value_container->set_raw_value(&double_neg_inf, sizeof(double_neg_inf)); |
| } else { |
| return Status::InternalError("invalid zone map with negative Infinity"); |
| } |
| } else { |
| RETURN_IF_ERROR(min_value_container->from_string(zone_map.min())); |
| } |
| |
| if (zone_map.has_nan()) { |
| if (FieldType::OLAP_FIELD_TYPE_FLOAT == _meta_type) { |
| static auto constexpr float_nan = std::numeric_limits<float>::quiet_NaN(); |
| max_value_container->set_raw_value(&float_nan, sizeof(float_nan)); |
| } else if (FieldType::OLAP_FIELD_TYPE_DOUBLE == _meta_type) { |
| static auto constexpr double_nan = std::numeric_limits<double>::quiet_NaN(); |
| max_value_container->set_raw_value(&double_nan, sizeof(double_nan)); |
| } else { |
| return Status::InternalError("invalid zone map with NaN"); |
| } |
| } else if (zone_map.has_positive_inf()) { |
| if (FieldType::OLAP_FIELD_TYPE_FLOAT == _meta_type) { |
| static auto constexpr float_pos_inf = std::numeric_limits<float>::infinity(); |
| max_value_container->set_raw_value(&float_pos_inf, sizeof(float_pos_inf)); |
| } else if (FieldType::OLAP_FIELD_TYPE_DOUBLE == _meta_type) { |
| static auto constexpr double_pos_inf = std::numeric_limits<double>::infinity(); |
| max_value_container->set_raw_value(&double_pos_inf, sizeof(double_pos_inf)); |
| } else { |
| return Status::InternalError("invalid zone map with positive Infinity"); |
| } |
| } else { |
| RETURN_IF_ERROR(max_value_container->from_string(zone_map.max())); |
| } |
| } |
| // for compatible original Cond eval logic |
| if (zone_map.has_null()) { |
| // for compatible, if exist null, original logic treat null as min |
| min_value_container->set_null(); |
| if (!zone_map.has_not_null()) { |
| // for compatible OlapCond's 'is not null' |
| max_value_container->set_null(); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::_parse_zone_map_skip_null(const ZoneMapPB& zone_map, |
| WrapperField* min_value_container, |
| WrapperField* max_value_container) const { |
| // min value and max value are valid if has_not_null is true |
| if (zone_map.has_not_null()) { |
| RETURN_IF_ERROR(min_value_container->from_string(zone_map.min())); |
| RETURN_IF_ERROR(max_value_container->from_string(zone_map.max())); |
| } else { |
| min_value_container->set_null(); |
| max_value_container->set_null(); |
| } |
| return Status::OK(); |
| } |
| |
| bool ColumnReader::_zone_map_match_condition(const ZoneMapPB& zone_map, |
| WrapperField* min_value_container, |
| WrapperField* max_value_container, |
| const AndBlockColumnPredicate* col_predicates) const { |
| if (zone_map.pass_all() || min_value_container == nullptr || max_value_container == nullptr) { |
| return true; |
| } |
| |
| return col_predicates->evaluate_and({min_value_container, max_value_container}); |
| } |
| |
| Status ColumnReader::_get_filtered_pages( |
| const AndBlockColumnPredicate* col_predicates, |
| const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates, |
| std::vector<uint32_t>* page_indexes, const ColumnIteratorOptions& iter_opts) { |
| RETURN_IF_ERROR(_load_zone_map_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts)); |
| |
| FieldType type = _type_info->type(); |
| const std::vector<ZoneMapPB>& zone_maps = _zone_map_index->page_zone_maps(); |
| size_t page_size = _zone_map_index->num_pages(); |
| std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta_length)); |
| std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, _meta_length)); |
| for (size_t i = 0; i < page_size; ++i) { |
| if (zone_maps[i].pass_all()) { |
| page_indexes->push_back(cast_set<uint32_t>(i)); |
| } else { |
| RETURN_IF_ERROR(_parse_zone_map(zone_maps[i], min_value.get(), max_value.get())); |
| if (_zone_map_match_condition(zone_maps[i], min_value.get(), max_value.get(), |
| col_predicates)) { |
| bool should_read = true; |
| if (delete_predicates != nullptr) { |
| for (auto del_pred : *delete_predicates) { |
| // TODO: Both `min_value` and `max_value` should be 0 or neither should be 0. |
| // So nullable only need to judge once. |
| if (min_value != nullptr && max_value != nullptr && |
| del_pred->evaluate_del({min_value.get(), max_value.get()})) { |
| should_read = false; |
| break; |
| } |
| } |
| } |
| if (should_read) { |
| page_indexes->push_back(cast_set<uint32_t>(i)); |
| } |
| } |
| } |
| } |
| VLOG(1) << "total-pages: " << page_size << " not-filtered-pages: " << page_indexes->size() |
| << " filtered-percent:" |
| << 1.0 - (static_cast<double>(page_indexes->size()) / |
| (static_cast<double>(page_size) * 1.0)); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>& page_indexes, |
| RowRanges* row_ranges, |
| const ColumnIteratorOptions& iter_opts) { |
| row_ranges->clear(); |
| RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts)); |
| for (auto i : page_indexes) { |
| ordinal_t page_first_id = _ordinal_index->get_first_ordinal(i); |
| ordinal_t page_last_id = _ordinal_index->get_last_ordinal(i); |
| RowRanges page_row_ranges(RowRanges::create_single(page_first_id, page_last_id + 1)); |
| RowRanges::ranges_union(*row_ranges, page_row_ranges, row_ranges); |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates, |
| RowRanges* row_ranges, |
| const ColumnIteratorOptions& iter_opts) { |
| RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts)); |
| RETURN_IF_ERROR( |
| _load_bloom_filter_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts)); |
| RowRanges bf_row_ranges; |
| std::unique_ptr<BloomFilterIndexIterator> bf_iter; |
| RETURN_IF_ERROR(_bloom_filter_index->new_iterator(&bf_iter, iter_opts.stats)); |
| size_t range_size = row_ranges->range_size(); |
| // get covered page ids |
| std::set<uint32_t> page_ids; |
| for (int i = 0; i < range_size; ++i) { |
| int64_t from = row_ranges->get_range_from(i); |
| int64_t idx = from; |
| int64_t to = row_ranges->get_range_to(i); |
| auto iter = _ordinal_index->seek_at_or_before(from); |
| while (idx < to && iter.valid()) { |
| page_ids.insert(iter.page_index()); |
| idx = iter.last_ordinal() + 1; |
| iter.next(); |
| } |
| } |
| for (auto& pid : page_ids) { |
| std::unique_ptr<BloomFilter> bf; |
| RETURN_IF_ERROR(bf_iter->read_bloom_filter(pid, &bf)); |
| if (col_predicates->evaluate_and(bf.get())) { |
| bf_row_ranges.add(RowRange(_ordinal_index->get_first_ordinal(pid), |
| _ordinal_index->get_last_ordinal(pid) + 1)); |
| } |
| } |
| RowRanges::ranges_intersection(*row_ranges, bf_row_ranges, row_ranges); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::_load_ordinal_index(bool use_page_cache, bool kept_in_memory, |
| const ColumnIteratorOptions& iter_opts) { |
| if (!_ordinal_index) { |
| return Status::InternalError("ordinal_index not inited"); |
| } |
| return _ordinal_index->load(use_page_cache, kept_in_memory, iter_opts.stats); |
| } |
| |
| Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memory, |
| const ColumnIteratorOptions& iter_opts) { |
| if (_zone_map_index != nullptr) { |
| return _zone_map_index->load(use_page_cache, kept_in_memory, iter_opts.stats); |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::_load_index(const std::shared_ptr<IndexFileReader>& index_file_reader, |
| const TabletIndex* index_meta) { |
| std::unique_lock<std::shared_mutex> wlock(_load_index_lock); |
| |
| if (index_meta == nullptr) { |
| return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
| "Failed to load inverted index: index metadata is null"); |
| } |
| |
| auto it = _index_readers.find(index_meta->index_id()); |
| if (it != _index_readers.end()) { |
| return Status::OK(); |
| } |
| |
| bool should_analyzer = |
| inverted_index::InvertedIndexAnalyzer::should_analyzer(index_meta->properties()); |
| |
| FieldType type; |
| if (_meta_type == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
| type = _meta_children_column_type; |
| } else { |
| type = _type_info->type(); |
| } |
| |
| if (index_meta->index_type() == IndexType::ANN) { |
| _index_readers[index_meta->index_id()] = |
| std::make_shared<AnnIndexReader>(index_meta, index_file_reader); |
| return Status::OK(); |
| } |
| |
| IndexReaderPtr index_reader; |
| |
| if (is_string_type(type)) { |
| if (should_analyzer) { |
| try { |
| index_reader = FullTextIndexReader::create_shared(index_meta, index_file_reader); |
| } catch (const CLuceneError& e) { |
| return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
| "create FullTextIndexReader error: {}", e.what()); |
| } |
| } else { |
| try { |
| index_reader = |
| StringTypeInvertedIndexReader::create_shared(index_meta, index_file_reader); |
| } catch (const CLuceneError& e) { |
| return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
| "create StringTypeInvertedIndexReader error: {}", e.what()); |
| } |
| } |
| } else if (is_numeric_type(type)) { |
| try { |
| index_reader = BkdIndexReader::create_shared(index_meta, index_file_reader); |
| } catch (const CLuceneError& e) { |
| return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( |
| "create BkdIndexReader error: {}", e.what()); |
| } |
| } else { |
| return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>( |
| "Field type {} is not supported for inverted index", type); |
| } |
| _index_readers[index_meta->index_id()] = index_reader; |
| return Status::OK(); |
| } |
| |
| bool ColumnReader::has_bloom_filter_index(bool ngram) const { |
| if (_bloom_filter_index == nullptr) return false; |
| |
| if (ngram) { |
| return _bloom_filter_index->algorithm() == BloomFilterAlgorithmPB::NGRAM_BLOOM_FILTER; |
| } else { |
| return _bloom_filter_index->algorithm() != BloomFilterAlgorithmPB::NGRAM_BLOOM_FILTER; |
| } |
| } |
| |
| Status ColumnReader::_load_bloom_filter_index(bool use_page_cache, bool kept_in_memory, |
| const ColumnIteratorOptions& iter_opts) { |
| if (_bloom_filter_index != nullptr) { |
| return _bloom_filter_index->load(use_page_cache, kept_in_memory, iter_opts.stats); |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter, |
| const ColumnIteratorOptions& iter_opts) { |
| RETURN_IF_ERROR(_load_ordinal_index(_use_index_page_cache, _opts.kept_in_memory, iter_opts)); |
| *iter = _ordinal_index->seek_at_or_before(ordinal); |
| if (!iter->valid()) { |
| return Status::NotFound("Failed to seek to ordinal {}, ", ordinal); |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column) { |
| return new_iterator(iterator, tablet_column, nullptr); |
| } |
| |
| Status ColumnReader::new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column, |
| const StorageReadOptions* opt) { |
| if (is_empty()) { |
| *iterator = std::make_unique<EmptyFileColumnIterator>(); |
| return Status::OK(); |
| } |
| if (is_scalar_type(_meta_type)) { |
| *iterator = std::make_unique<FileColumnIterator>(shared_from_this()); |
| (*iterator)->set_column_name(tablet_column ? tablet_column->name() : ""); |
| return Status::OK(); |
| } else { |
| auto type = _meta_type; |
| switch (type) { |
| case FieldType::OLAP_FIELD_TYPE_AGG_STATE: { |
| return new_agg_state_iterator(iterator); |
| } |
| case FieldType::OLAP_FIELD_TYPE_STRUCT: { |
| return new_struct_iterator(iterator, tablet_column); |
| } |
| case FieldType::OLAP_FIELD_TYPE_ARRAY: { |
| return new_array_iterator(iterator, tablet_column); |
| } |
| case FieldType::OLAP_FIELD_TYPE_MAP: { |
| return new_map_iterator(iterator, tablet_column); |
| } |
| default: |
| return Status::NotSupported("unsupported type to create iterator: {}", |
| std::to_string(int(type))); |
| } |
| } |
| } |
| |
| Status ColumnReader::new_agg_state_iterator(ColumnIteratorUPtr* iterator) { |
| *iterator = std::make_unique<FileColumnIterator>(shared_from_this()); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::new_array_iterator(ColumnIteratorUPtr* iterator, |
| const TabletColumn* tablet_column) { |
| ColumnIteratorUPtr item_iterator; |
| RETURN_IF_ERROR(_sub_readers[0]->new_iterator( |
| &item_iterator, tablet_column && tablet_column->get_subtype_count() > 0 |
| ? &tablet_column->get_sub_column(0) |
| : nullptr)); |
| |
| item_iterator->set_column_name(tablet_column ? tablet_column->get_sub_column(0).name() : ""); |
| |
| ColumnIteratorUPtr offset_iterator; |
| RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&offset_iterator, nullptr)); |
| auto* file_iter = static_cast<FileColumnIterator*>(offset_iterator.release()); |
| OffsetFileColumnIteratorUPtr ofcIter = std::make_unique<OffsetFileColumnIterator>( |
| std::unique_ptr<FileColumnIterator>(file_iter)); |
| |
| ColumnIteratorUPtr null_iterator; |
| if (is_nullable()) { |
| RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator, nullptr)); |
| } |
| *iterator = std::make_unique<ArrayFileColumnIterator>(shared_from_this(), std::move(ofcIter), |
| std::move(item_iterator), |
| std::move(null_iterator)); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::new_map_iterator(ColumnIteratorUPtr* iterator, |
| const TabletColumn* tablet_column) { |
| ColumnIteratorUPtr key_iterator; |
| RETURN_IF_ERROR(_sub_readers[0]->new_iterator( |
| &key_iterator, tablet_column && tablet_column->get_subtype_count() > 1 |
| ? &tablet_column->get_sub_column(0) |
| : nullptr)); |
| key_iterator->set_column_name(tablet_column ? tablet_column->get_sub_column(0).name() : ""); |
| ColumnIteratorUPtr val_iterator; |
| RETURN_IF_ERROR(_sub_readers[1]->new_iterator( |
| &val_iterator, tablet_column && tablet_column->get_subtype_count() > 1 |
| ? &tablet_column->get_sub_column(1) |
| : nullptr)); |
| val_iterator->set_column_name(tablet_column ? tablet_column->get_sub_column(1).name() : ""); |
| ColumnIteratorUPtr offsets_iterator; |
| RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&offsets_iterator, nullptr)); |
| auto* file_iter = static_cast<FileColumnIterator*>(offsets_iterator.release()); |
| OffsetFileColumnIteratorUPtr ofcIter = std::make_unique<OffsetFileColumnIterator>( |
| std::unique_ptr<FileColumnIterator>(file_iter)); |
| |
| ColumnIteratorUPtr null_iterator; |
| if (is_nullable()) { |
| RETURN_IF_ERROR(_sub_readers[3]->new_iterator(&null_iterator, nullptr)); |
| } |
| *iterator = std::make_unique<MapFileColumnIterator>( |
| shared_from_this(), std::move(null_iterator), std::move(ofcIter), |
| std::move(key_iterator), std::move(val_iterator)); |
| return Status::OK(); |
| } |
| |
| Status ColumnReader::new_struct_iterator(ColumnIteratorUPtr* iterator, |
| const TabletColumn* tablet_column) { |
| std::vector<ColumnIteratorUPtr> sub_column_iterators; |
| size_t child_size = is_nullable() ? _sub_readers.size() - 1 : _sub_readers.size(); |
| size_t tablet_column_size = tablet_column ? tablet_column->get_sub_columns().size() : 0; |
| sub_column_iterators.reserve(child_size); |
| |
| for (uint64_t i = 0; i < child_size; i++) { |
| ColumnIteratorUPtr sub_column_iterator; |
| RETURN_IF_ERROR(_sub_readers[i]->new_iterator( |
| &sub_column_iterator, tablet_column ? &tablet_column->get_sub_column(i) : nullptr)); |
| sub_column_iterator->set_column_name(tablet_column ? tablet_column->get_sub_column(i).name() |
| : ""); |
| sub_column_iterators.emplace_back(std::move(sub_column_iterator)); |
| } |
| |
| // create default_iterator for schema-change behavior which increase column |
| for (size_t i = child_size; i < tablet_column_size; i++) { |
| TabletColumn column = tablet_column->get_sub_column(i); |
| ColumnIteratorUPtr it; |
| RETURN_IF_ERROR(Segment::new_default_iterator(column, &it)); |
| it->set_column_name(column.name()); |
| sub_column_iterators.emplace_back(std::move(it)); |
| } |
| |
| ColumnIteratorUPtr null_iterator; |
| if (is_nullable()) { |
| RETURN_IF_ERROR(_sub_readers[child_size]->new_iterator(&null_iterator, nullptr)); |
| } |
| *iterator = std::make_unique<StructFileColumnIterator>( |
| shared_from_this(), std::move(null_iterator), std::move(sub_column_iterators)); |
| return Status::OK(); |
| } |
| |
| Result<TColumnAccessPaths> ColumnIterator::_get_sub_access_paths( |
| const TColumnAccessPaths& access_paths) { |
| TColumnAccessPaths sub_access_paths = access_paths; |
| for (auto it = sub_access_paths.begin(); it != sub_access_paths.end();) { |
| TColumnAccessPath& name_path = *it; |
| if (name_path.data_access_path.path.empty()) { |
| return ResultError( |
| Status::InternalError("Invalid access path for struct column: path is empty")); |
| } |
| |
| if (!StringCaseEqual()(name_path.data_access_path.path[0], _column_name)) { |
| return ResultError(Status::InternalError( |
| R"(Invalid access path for column: expected name "{}", got "{}")", _column_name, |
| name_path.data_access_path.path[0])); |
| } |
| |
| name_path.data_access_path.path.erase(name_path.data_access_path.path.begin()); |
| if (!name_path.data_access_path.path.empty()) { |
| ++it; |
| } else { |
| set_need_to_read(); |
| it = sub_access_paths.erase(it); |
| } |
| } |
| return sub_access_paths; |
| } |
| |
| ///====================== MapFileColumnIterator ============================//// |
| MapFileColumnIterator::MapFileColumnIterator(std::shared_ptr<ColumnReader> reader, |
| ColumnIteratorUPtr null_iterator, |
| OffsetFileColumnIteratorUPtr offsets_iterator, |
| ColumnIteratorUPtr key_iterator, |
| ColumnIteratorUPtr val_iterator) |
| : _map_reader(reader), |
| _offsets_iterator(std::move(offsets_iterator)), |
| _key_iterator(std::move(key_iterator)), |
| _val_iterator(std::move(val_iterator)) { |
| if (_map_reader->is_nullable()) { |
| _null_iterator = std::move(null_iterator); |
| } |
| } |
| |
| Status MapFileColumnIterator::init(const ColumnIteratorOptions& opts) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Map column iterator column " << _column_name << " skip reading."; |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(_key_iterator->init(opts)); |
| RETURN_IF_ERROR(_val_iterator->init(opts)); |
| RETURN_IF_ERROR(_offsets_iterator->init(opts)); |
| if (_map_reader->is_nullable()) { |
| RETURN_IF_ERROR(_null_iterator->init(opts)); |
| } |
| return Status::OK(); |
| } |
| |
| Status MapFileColumnIterator::seek_to_ordinal(ordinal_t ord) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Map column iterator column " << _column_name << " skip reading."; |
| return Status::OK(); |
| } |
| |
| if (_map_reader->is_nullable()) { |
| RETURN_IF_ERROR(_null_iterator->seek_to_ordinal(ord)); |
| } |
| RETURN_IF_ERROR(_offsets_iterator->seek_to_ordinal(ord)); |
| // here to use offset info |
| ordinal_t offset = 0; |
| RETURN_IF_ERROR(_offsets_iterator->_peek_one_offset(&offset)); |
| RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(offset)); |
| RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(offset)); |
| return Status::OK(); |
| } |
| |
| Status MapFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, |
| bool* has_null) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Map column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(*n); |
| return Status::OK(); |
| } |
| |
| auto& column_map = assert_cast<vectorized::ColumnMap&, TypeCheckOnRelease::DISABLE>( |
| dst->is_nullable() ? static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column() |
| : *dst); |
| auto column_offsets_ptr = column_map.get_offsets_column().assume_mutable(); |
| bool offsets_has_null = false; |
| ssize_t start = column_offsets_ptr->size(); |
| RETURN_IF_ERROR(_offsets_iterator->next_batch(n, column_offsets_ptr, &offsets_has_null)); |
| if (*n == 0) { |
| return Status::OK(); |
| } |
| auto& column_offsets = |
| static_cast<vectorized::ColumnArray::ColumnOffsets&>(*column_offsets_ptr); |
| RETURN_IF_ERROR(_offsets_iterator->_calculate_offsets(start, column_offsets)); |
| DCHECK(column_offsets.get_data().back() >= column_offsets.get_data()[start - 1]); |
| size_t num_items = |
| column_offsets.get_data().back() - column_offsets.get_data()[start - 1]; // -1 is valid |
| auto key_ptr = column_map.get_keys().assume_mutable(); |
| auto val_ptr = column_map.get_values().assume_mutable(); |
| |
| if (num_items > 0) { |
| size_t num_read = num_items; |
| bool key_has_null = false; |
| bool val_has_null = false; |
| RETURN_IF_ERROR(_key_iterator->next_batch(&num_read, key_ptr, &key_has_null)); |
| RETURN_IF_ERROR(_val_iterator->next_batch(&num_read, val_ptr, &val_has_null)); |
| DCHECK(num_read == num_items); |
| |
| column_map.get_keys_ptr() = std::move(key_ptr); |
| column_map.get_values_ptr() = std::move(val_ptr); |
| } |
| |
| if (dst->is_nullable()) { |
| size_t num_read = *n; |
| auto null_map_ptr = |
| static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr(); |
| // in not-null to null linked-schemachange mode, |
| // actually we do not change dat data include meta in footer, |
| // so may dst from changed meta which is nullable but old data is not nullable, |
| // if so, we should set null_map to all null by default |
| if (_null_iterator) { |
| bool null_signs_has_null = false; |
| RETURN_IF_ERROR( |
| _null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null)); |
| } else { |
| auto& null_map = assert_cast<vectorized::ColumnUInt8&, TypeCheckOnRelease::DISABLE>( |
| *null_map_ptr); |
| null_map.insert_many_vals(0, num_read); |
| } |
| DCHECK(num_read == *n); |
| } |
| return Status::OK(); |
| } |
| |
| Status MapFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count, |
| vectorized::MutableColumnPtr& dst) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "File column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(count); |
| return Status::OK(); |
| } |
| if (count == 0) { |
| return Status::OK(); |
| } |
| // resolve ColumnMap and nullable wrapper |
| const auto* column_map = vectorized::check_and_get_column<vectorized::ColumnMap>( |
| dst->is_nullable() ? static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column() |
| : *dst); |
| auto offsets_ptr = column_map->get_offsets_column().assume_mutable(); |
| auto& offsets = static_cast<vectorized::ColumnArray::ColumnOffsets&>(*offsets_ptr); |
| size_t base = offsets.get_data().empty() ? 0 : offsets.get_data().back(); |
| |
| // 1. bulk read null-map if nullable |
| std::vector<uint8_t> null_mask; // 0: not null, 1: null |
| if (_map_reader->is_nullable()) { |
| // For nullable map columns, the destination column must also be nullable. |
| if (UNLIKELY(!dst->is_nullable())) { |
| return Status::InternalError( |
| "unexpected non-nullable destination column for nullable map reader"); |
| } |
| auto null_map_ptr = |
| static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr(); |
| size_t null_before = null_map_ptr->size(); |
| RETURN_IF_ERROR(_null_iterator->read_by_rowids(rowids, count, null_map_ptr)); |
| // extract a light-weight view to decide element reads |
| auto& null_map_col = assert_cast<vectorized::ColumnUInt8&>(*null_map_ptr); |
| null_mask.reserve(count); |
| for (size_t i = 0; i < count; ++i) { |
| null_mask.push_back(null_map_col.get_element(null_before + i)); |
| } |
| } else if (dst->is_nullable()) { |
| // in not-null to null linked-schemachange mode, |
| // actually we do not change dat data include meta in footer, |
| // so may dst from changed meta which is nullable but old data is not nullable, |
| // if so, we should set null_map to all null by default |
| auto null_map_ptr = |
| static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr(); |
| auto& null_map = assert_cast<vectorized::ColumnUInt8&>(*null_map_ptr); |
| null_map.insert_many_vals(0, count); |
| } |
| |
| // 2. bulk read start ordinals for requested rows |
| vectorized::MutableColumnPtr starts_col = vectorized::ColumnOffset64::create(); |
| starts_col->reserve(count); |
| RETURN_IF_ERROR(_offsets_iterator->read_by_rowids(rowids, count, starts_col)); |
| |
| // 3. bulk read next-start ordinals for rowid+1 (within bounds) |
| std::vector<rowid_t> next_rowids(count); |
| for (size_t i = 0; i < count; ++i) { |
| uint64_t nr = rowids[i] + 1; |
| next_rowids[i] = nr < _map_reader->num_rows() ? static_cast<rowid_t>(nr) |
| : static_cast<rowid_t>(0); // placeholder |
| } |
| vectorized::MutableColumnPtr next_starts_col = vectorized::ColumnOffset64::create(); |
| next_starts_col->reserve(count); |
| // read for all; we'll fix out-of-bound cases below |
| RETURN_IF_ERROR(_offsets_iterator->read_by_rowids(next_rowids.data(), count, next_starts_col)); |
| |
| // 4. fix next_start for rows whose next_rowid is out-of-bound (rowid == num_rows-1) |
| for (size_t i = 0; i < count; ++i) { |
| if (rowids[i] + 1 >= _map_reader->num_rows()) { |
| // seek to the last row and consume one to move decoder to end-of-page, |
| // then peek page-tail sentinel next_array_item_ordinal as next_start |
| RETURN_IF_ERROR(_offsets_iterator->seek_to_ordinal(rowids[i])); |
| size_t one = 1; |
| bool has_null_unused = false; |
| vectorized::MutableColumnPtr tmp = vectorized::ColumnOffset64::create(); |
| RETURN_IF_ERROR(_offsets_iterator->next_batch(&one, tmp, &has_null_unused)); |
| ordinal_t ns = 0; |
| RETURN_IF_ERROR(_offsets_iterator->_peek_one_offset(&ns)); |
| // overwrite with sentinel |
| assert_cast<vectorized::ColumnOffset64&, TypeCheckOnRelease::DISABLE>(*next_starts_col) |
| .get_data()[i] = ns; |
| } |
| } |
| |
| // 5. compute sizes and append offsets prefix-sum |
| auto& starts_data = assert_cast<vectorized::ColumnOffset64&>(*starts_col).get_data(); |
| auto& next_starts_data = assert_cast<vectorized::ColumnOffset64&>(*next_starts_col).get_data(); |
| std::vector<size_t> sizes(count, 0); |
| size_t acc = base; |
| const auto original_size = offsets.get_data().back(); |
| offsets.get_data().reserve(offsets.get_data().size() + count); |
| for (size_t i = 0; i < count; ++i) { |
| size_t sz = static_cast<size_t>(next_starts_data[i] - starts_data[i]); |
| if (_map_reader->is_nullable() && !null_mask.empty() && null_mask[i]) { |
| sz = 0; // null rows do not consume elements |
| } |
| sizes[i] = sz; |
| acc += sz; |
| offsets.get_data().push_back(acc); |
| } |
| |
| // 6. read key/value elements for non-empty sizes |
| auto keys_ptr = column_map->get_keys().assume_mutable(); |
| auto vals_ptr = column_map->get_values().assume_mutable(); |
| |
| size_t this_run = sizes[0]; |
| auto start_idx = starts_data[0]; |
| auto last_idx = starts_data[0] + this_run; |
| for (size_t i = 1; i < count; ++i) { |
| size_t sz = sizes[i]; |
| if (sz == 0) { |
| continue; |
| } |
| auto start = static_cast<ordinal_t>(starts_data[i]); |
| if (start != last_idx) { |
| size_t n = this_run; |
| bool dummy_has_null = false; |
| |
| if (this_run != 0) { |
| if (_key_iterator->reading_flag() != ReadingFlag::SKIP_READING) { |
| RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(start_idx)); |
| RETURN_IF_ERROR(_key_iterator->next_batch(&n, keys_ptr, &dummy_has_null)); |
| DCHECK(n == this_run); |
| } |
| |
| if (_val_iterator->reading_flag() != ReadingFlag::SKIP_READING) { |
| n = this_run; |
| RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(start_idx)); |
| RETURN_IF_ERROR(_val_iterator->next_batch(&n, vals_ptr, &dummy_has_null)); |
| DCHECK(n == this_run); |
| } |
| } |
| start_idx = start; |
| this_run = sz; |
| last_idx = start + sz; |
| continue; |
| } |
| |
| this_run += sz; |
| last_idx += sz; |
| } |
| |
| size_t n = this_run; |
| const size_t total_count = offsets.get_data().back() - original_size; |
| bool dummy_has_null = false; |
| if (_key_iterator->reading_flag() != ReadingFlag::SKIP_READING) { |
| if (this_run != 0) { |
| RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(start_idx)); |
| RETURN_IF_ERROR(_key_iterator->next_batch(&n, keys_ptr, &dummy_has_null)); |
| DCHECK(n == this_run); |
| } |
| } else { |
| keys_ptr->insert_many_defaults(total_count); |
| } |
| |
| if (_val_iterator->reading_flag() != ReadingFlag::SKIP_READING) { |
| if (this_run != 0) { |
| n = this_run; |
| RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(start_idx)); |
| RETURN_IF_ERROR(_val_iterator->next_batch(&n, vals_ptr, &dummy_has_null)); |
| DCHECK(n == this_run); |
| } |
| } else { |
| vals_ptr->insert_many_defaults(total_count); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void MapFileColumnIterator::set_need_to_read() { |
| set_reading_flag(ReadingFlag::NEED_TO_READ); |
| _key_iterator->set_need_to_read(); |
| _val_iterator->set_need_to_read(); |
| } |
| |
| void MapFileColumnIterator::remove_pruned_sub_iterators() { |
| _key_iterator->remove_pruned_sub_iterators(); |
| _val_iterator->remove_pruned_sub_iterators(); |
| } |
| |
| Status MapFileColumnIterator::set_access_paths(const TColumnAccessPaths& all_access_paths, |
| const TColumnAccessPaths& predicate_access_paths) { |
| if (all_access_paths.empty()) { |
| return Status::OK(); |
| } |
| |
| if (!predicate_access_paths.empty()) { |
| set_reading_flag(ReadingFlag::READING_FOR_PREDICATE); |
| DLOG(INFO) << "Map column iterator set sub-column " << _column_name |
| << " to READING_FOR_PREDICATE"; |
| } |
| |
| auto sub_all_access_paths = DORIS_TRY(_get_sub_access_paths(all_access_paths)); |
| auto sub_predicate_access_paths = DORIS_TRY(_get_sub_access_paths(predicate_access_paths)); |
| |
| if (sub_all_access_paths.empty()) { |
| return Status::OK(); |
| } |
| |
| TColumnAccessPaths key_all_access_paths; |
| TColumnAccessPaths val_all_access_paths; |
| TColumnAccessPaths key_predicate_access_paths; |
| TColumnAccessPaths val_predicate_access_paths; |
| |
| for (auto paths : sub_all_access_paths) { |
| if (paths.data_access_path.path[0] == "*") { |
| paths.data_access_path.path[0] = _key_iterator->column_name(); |
| key_all_access_paths.emplace_back(paths); |
| paths.data_access_path.path[0] = _val_iterator->column_name(); |
| val_all_access_paths.emplace_back(paths); |
| } else if (paths.data_access_path.path[0] == "KEYS") { |
| paths.data_access_path.path[0] = _key_iterator->column_name(); |
| key_all_access_paths.emplace_back(paths); |
| } else if (paths.data_access_path.path[0] == "VALUES") { |
| paths.data_access_path.path[0] = _val_iterator->column_name(); |
| val_all_access_paths.emplace_back(paths); |
| } |
| } |
| const auto need_read_keys = !key_all_access_paths.empty(); |
| const auto need_read_values = !val_all_access_paths.empty(); |
| |
| for (auto paths : sub_predicate_access_paths) { |
| if (paths.data_access_path.path[0] == "*") { |
| paths.data_access_path.path[0] = _key_iterator->column_name(); |
| key_predicate_access_paths.emplace_back(paths); |
| paths.data_access_path.path[0] = _val_iterator->column_name(); |
| val_predicate_access_paths.emplace_back(paths); |
| } else if (paths.data_access_path.path[0] == "KEYS") { |
| paths.data_access_path.path[0] = _key_iterator->column_name(); |
| key_predicate_access_paths.emplace_back(paths); |
| } else if (paths.data_access_path.path[0] == "VALUES") { |
| paths.data_access_path.path[0] = _val_iterator->column_name(); |
| val_predicate_access_paths.emplace_back(paths); |
| } |
| } |
| |
| if (need_read_keys) { |
| _key_iterator->set_reading_flag(ReadingFlag::NEED_TO_READ); |
| RETURN_IF_ERROR( |
| _key_iterator->set_access_paths(key_all_access_paths, key_predicate_access_paths)); |
| } else { |
| _key_iterator->set_reading_flag(ReadingFlag::SKIP_READING); |
| DLOG(INFO) << "Map column iterator set key column to SKIP_READING"; |
| } |
| |
| if (need_read_values) { |
| _val_iterator->set_reading_flag(ReadingFlag::NEED_TO_READ); |
| RETURN_IF_ERROR( |
| _val_iterator->set_access_paths(val_all_access_paths, val_predicate_access_paths)); |
| } else { |
| _val_iterator->set_reading_flag(ReadingFlag::SKIP_READING); |
| DLOG(INFO) << "Map column iterator set value column to SKIP_READING"; |
| } |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| StructFileColumnIterator::StructFileColumnIterator( |
| std::shared_ptr<ColumnReader> reader, ColumnIteratorUPtr null_iterator, |
| std::vector<ColumnIteratorUPtr>&& sub_column_iterators) |
| : _struct_reader(reader), _sub_column_iterators(std::move(sub_column_iterators)) { |
| if (_struct_reader->is_nullable()) { |
| _null_iterator = std::move(null_iterator); |
| } |
| } |
| |
| Status StructFileColumnIterator::init(const ColumnIteratorOptions& opts) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Struct column iterator column " << _column_name << " skip reading."; |
| return Status::OK(); |
| } |
| |
| for (auto& column_iterator : _sub_column_iterators) { |
| RETURN_IF_ERROR(column_iterator->init(opts)); |
| } |
| if (_struct_reader->is_nullable()) { |
| RETURN_IF_ERROR(_null_iterator->init(opts)); |
| } |
| return Status::OK(); |
| } |
| |
| Status StructFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, |
| bool* has_null) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Struct column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(*n); |
| return Status::OK(); |
| } |
| |
| auto& column_struct = assert_cast<vectorized::ColumnStruct&, TypeCheckOnRelease::DISABLE>( |
| dst->is_nullable() ? static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column() |
| : *dst); |
| for (size_t i = 0; i < column_struct.tuple_size(); i++) { |
| size_t num_read = *n; |
| auto sub_column_ptr = column_struct.get_column(i).assume_mutable(); |
| bool column_has_null = false; |
| RETURN_IF_ERROR( |
| _sub_column_iterators[i]->next_batch(&num_read, sub_column_ptr, &column_has_null)); |
| DCHECK(num_read == *n); |
| column_struct.get_column_ptr(i) = std::move(sub_column_ptr); |
| } |
| |
| if (dst->is_nullable()) { |
| size_t num_read = *n; |
| auto null_map_ptr = |
| static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr(); |
| // in not-null to null linked-schemachange mode, |
| // actually we do not change dat data include meta in footer, |
| // so may dst from changed meta which is nullable but old data is not nullable, |
| // if so, we should set null_map to all null by default |
| if (_null_iterator) { |
| bool null_signs_has_null = false; |
| RETURN_IF_ERROR( |
| _null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null)); |
| } else { |
| auto& null_map = assert_cast<vectorized::ColumnUInt8&, TypeCheckOnRelease::DISABLE>( |
| *null_map_ptr); |
| null_map.insert_many_vals(0, num_read); |
| } |
| DCHECK(num_read == *n); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status StructFileColumnIterator::seek_to_ordinal(ordinal_t ord) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Struct column iterator column " << _column_name << " skip reading."; |
| return Status::OK(); |
| } |
| |
| for (auto& column_iterator : _sub_column_iterators) { |
| RETURN_IF_ERROR(column_iterator->seek_to_ordinal(ord)); |
| } |
| if (_struct_reader->is_nullable()) { |
| RETURN_IF_ERROR(_null_iterator->seek_to_ordinal(ord)); |
| } |
| return Status::OK(); |
| } |
| |
| Status StructFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count, |
| vectorized::MutableColumnPtr& dst) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Struct column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(count); |
| return Status::OK(); |
| } |
| |
| if (count == 0) { |
| return Status::OK(); |
| } |
| |
| size_t this_run = 1; |
| auto start_idx = rowids[0]; |
| auto last_idx = rowids[0]; |
| for (size_t i = 1; i < count; ++i) { |
| if (last_idx == rowids[i] - 1) { |
| last_idx = rowids[i]; |
| this_run++; |
| continue; |
| } |
| RETURN_IF_ERROR(seek_to_ordinal(start_idx)); |
| size_t num_read = this_run; |
| RETURN_IF_ERROR(next_batch(&num_read, dst, nullptr)); |
| DCHECK_EQ(num_read, this_run); |
| |
| start_idx = rowids[i]; |
| last_idx = rowids[i]; |
| this_run = 1; |
| } |
| |
| RETURN_IF_ERROR(seek_to_ordinal(start_idx)); |
| size_t num_read = this_run; |
| RETURN_IF_ERROR(next_batch(&num_read, dst, nullptr)); |
| DCHECK_EQ(num_read, this_run); |
| return Status::OK(); |
| } |
| |
| void StructFileColumnIterator::set_need_to_read() { |
| set_reading_flag(ReadingFlag::NEED_TO_READ); |
| for (auto& sub_iterator : _sub_column_iterators) { |
| sub_iterator->set_need_to_read(); |
| } |
| } |
| |
| void StructFileColumnIterator::remove_pruned_sub_iterators() { |
| for (auto it = _sub_column_iterators.begin(); it != _sub_column_iterators.end();) { |
| auto& sub_iterator = *it; |
| if (sub_iterator->reading_flag() == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Struct column iterator remove pruned sub-column " |
| << sub_iterator->column_name(); |
| it = _sub_column_iterators.erase(it); |
| } else { |
| sub_iterator->remove_pruned_sub_iterators(); |
| ++it; |
| } |
| } |
| } |
| |
| Status StructFileColumnIterator::set_access_paths( |
| const TColumnAccessPaths& all_access_paths, |
| const TColumnAccessPaths& predicate_access_paths) { |
| if (all_access_paths.empty()) { |
| return Status::OK(); |
| } |
| |
| if (!predicate_access_paths.empty()) { |
| set_reading_flag(ReadingFlag::READING_FOR_PREDICATE); |
| DLOG(INFO) << "Struct column iterator set sub-column " << _column_name |
| << " to READING_FOR_PREDICATE"; |
| } |
| auto sub_all_access_paths = DORIS_TRY(_get_sub_access_paths(all_access_paths)); |
| auto sub_predicate_access_paths = DORIS_TRY(_get_sub_access_paths(predicate_access_paths)); |
| |
| const auto no_sub_column_to_skip = sub_all_access_paths.empty(); |
| const auto no_predicate_sub_column = sub_predicate_access_paths.empty(); |
| |
| for (auto& sub_iterator : _sub_column_iterators) { |
| const auto name = sub_iterator->column_name(); |
| bool need_to_read = no_sub_column_to_skip; |
| TColumnAccessPaths sub_all_access_paths_of_this; |
| if (!need_to_read) { |
| for (const auto& paths : sub_all_access_paths) { |
| if (paths.data_access_path.path[0] == name) { |
| sub_all_access_paths_of_this.emplace_back(paths); |
| } |
| } |
| need_to_read = !sub_all_access_paths_of_this.empty(); |
| } |
| |
| if (!need_to_read) { |
| set_reading_flag(ReadingFlag::SKIP_READING); |
| sub_iterator->set_reading_flag(ReadingFlag::SKIP_READING); |
| DLOG(INFO) << "Struct column iterator set sub-column " << name << " to SKIP_READING"; |
| continue; |
| } |
| set_reading_flag(ReadingFlag::NEED_TO_READ); |
| sub_iterator->set_reading_flag(ReadingFlag::NEED_TO_READ); |
| |
| TColumnAccessPaths sub_predicate_access_paths_of_this; |
| |
| if (!no_predicate_sub_column) { |
| for (const auto& paths : sub_predicate_access_paths) { |
| if (StringCaseEqual()(paths.data_access_path.path[0], name)) { |
| sub_predicate_access_paths_of_this.emplace_back(paths); |
| } |
| } |
| } |
| |
| RETURN_IF_ERROR(sub_iterator->set_access_paths(sub_all_access_paths_of_this, |
| sub_predicate_access_paths_of_this)); |
| } |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Status OffsetFileColumnIterator::init(const ColumnIteratorOptions& opts) { |
| RETURN_IF_ERROR(_offset_iterator->init(opts)); |
| // allocate peek tmp column once |
| _peek_tmp_col = vectorized::ColumnOffset64::create(); |
| return Status::OK(); |
| } |
| |
| Status OffsetFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, |
| bool* has_null) { |
| RETURN_IF_ERROR(_offset_iterator->next_batch(n, dst, has_null)); |
| return Status::OK(); |
| } |
| |
| Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) { |
| if (_offset_iterator->get_current_page()->has_remaining()) { |
| PageDecoder* offset_page_decoder = _offset_iterator->get_current_page()->data_decoder.get(); |
| size_t n = 1; |
| _peek_tmp_col->clear(); |
| RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, _peek_tmp_col)); // not null |
| DCHECK(_peek_tmp_col->size() == 1); |
| *offset = assert_cast<const vectorized::ColumnOffset64*, TypeCheckOnRelease::DISABLE>( |
| _peek_tmp_col.get()) |
| ->get_element(0); |
| } else { |
| *offset = _offset_iterator->get_current_page()->next_array_item_ordinal; |
| } |
| return Status::OK(); |
| } |
| |
| /** |
| * first_storage_offset read from page should smaller than next_storage_offset which here call _peek_one_offset from page, |
| and first_column_offset is keep in memory data which is different dimension with (first_storage_offset and next_storage_offset) |
| eg. step1. read page: first_storage_offset = 16382 |
| step2. read page below with _peek_one_offset(&last_offset): last_offset = 16387 |
| step3. first_offset = 126 which is calculate in column offsets |
| for loop column offsets element in size |
| we can calculate from first_storage_offset to next_storage_offset one by one to fill with offsets_data in memory column offsets |
| * @param start |
| * @param column_offsets |
| * @return |
| */ |
| Status OffsetFileColumnIterator::_calculate_offsets( |
| ssize_t start, vectorized::ColumnArray::ColumnOffsets& column_offsets) { |
| ordinal_t next_storage_offset = 0; |
| RETURN_IF_ERROR(_peek_one_offset(&next_storage_offset)); |
| |
| // calculate real offsets |
| auto& offsets_data = column_offsets.get_data(); |
| ordinal_t first_column_offset = offsets_data[start - 1]; // -1 is valid |
| ordinal_t first_storage_offset = offsets_data[start]; |
| DCHECK(next_storage_offset >= first_storage_offset); |
| for (ssize_t i = start; i < offsets_data.size() - 1; ++i) { |
| offsets_data[i] = first_column_offset + (offsets_data[i + 1] - first_storage_offset); |
| } |
| // last offset |
| offsets_data[offsets_data.size() - 1] = |
| first_column_offset + (next_storage_offset - first_storage_offset); |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ArrayFileColumnIterator::ArrayFileColumnIterator(std::shared_ptr<ColumnReader> reader, |
| OffsetFileColumnIteratorUPtr offset_reader, |
| ColumnIteratorUPtr item_iterator, |
| ColumnIteratorUPtr null_iterator) |
| : _array_reader(reader), |
| _offset_iterator(std::move(offset_reader)), |
| _item_iterator(std::move(item_iterator)) { |
| if (_array_reader->is_nullable()) { |
| _null_iterator = std::move(null_iterator); |
| } |
| } |
| |
| Status ArrayFileColumnIterator::init(const ColumnIteratorOptions& opts) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Array column iterator column " << _column_name << " skip readking."; |
| return Status::OK(); |
| } |
| |
| RETURN_IF_ERROR(_offset_iterator->init(opts)); |
| RETURN_IF_ERROR(_item_iterator->init(opts)); |
| if (_array_reader->is_nullable()) { |
| RETURN_IF_ERROR(_null_iterator->init(opts)); |
| } |
| return Status::OK(); |
| } |
| |
| Status ArrayFileColumnIterator::_seek_by_offsets(ordinal_t ord) { |
| // using offsets info |
| ordinal_t offset = 0; |
| RETURN_IF_ERROR(_offset_iterator->_peek_one_offset(&offset)); |
| RETURN_IF_ERROR(_item_iterator->seek_to_ordinal(offset)); |
| return Status::OK(); |
| } |
| |
| Status ArrayFileColumnIterator::seek_to_ordinal(ordinal_t ord) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Array column iterator column " << _column_name << " skip reading."; |
| return Status::OK(); |
| } |
| |
| RETURN_IF_ERROR(_offset_iterator->seek_to_ordinal(ord)); |
| if (_array_reader->is_nullable()) { |
| RETURN_IF_ERROR(_null_iterator->seek_to_ordinal(ord)); |
| } |
| return _seek_by_offsets(ord); |
| } |
| |
| Status ArrayFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, |
| bool* has_null) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Array column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(*n); |
| return Status::OK(); |
| } |
| |
| const auto* column_array = vectorized::check_and_get_column<vectorized::ColumnArray>( |
| dst->is_nullable() ? static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column() |
| : *dst); |
| |
| bool offsets_has_null = false; |
| auto column_offsets_ptr = column_array->get_offsets_column().assume_mutable(); |
| ssize_t start = column_offsets_ptr->size(); |
| RETURN_IF_ERROR(_offset_iterator->next_batch(n, column_offsets_ptr, &offsets_has_null)); |
| if (*n == 0) { |
| return Status::OK(); |
| } |
| auto& column_offsets = |
| static_cast<vectorized::ColumnArray::ColumnOffsets&>(*column_offsets_ptr); |
| RETURN_IF_ERROR(_offset_iterator->_calculate_offsets(start, column_offsets)); |
| size_t num_items = |
| column_offsets.get_data().back() - column_offsets.get_data()[start - 1]; // -1 is valid |
| auto column_items_ptr = column_array->get_data().assume_mutable(); |
| if (num_items > 0) { |
| size_t num_read = num_items; |
| bool items_has_null = false; |
| RETURN_IF_ERROR(_item_iterator->next_batch(&num_read, column_items_ptr, &items_has_null)); |
| DCHECK(num_read == num_items); |
| } |
| |
| if (dst->is_nullable()) { |
| auto null_map_ptr = |
| static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr(); |
| size_t num_read = *n; |
| // in not-null to null linked-schemachange mode, |
| // actually we do not change dat data include meta in footer, |
| // so may dst from changed meta which is nullable but old data is not nullable, |
| // if so, we should set null_map to all null by default |
| if (_null_iterator) { |
| bool null_signs_has_null = false; |
| RETURN_IF_ERROR( |
| _null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null)); |
| } else { |
| auto& null_map = assert_cast<vectorized::ColumnUInt8&, TypeCheckOnRelease::DISABLE>( |
| *null_map_ptr); |
| null_map.insert_many_vals(0, num_read); |
| } |
| DCHECK(num_read == *n); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status ArrayFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count, |
| vectorized::MutableColumnPtr& dst) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "Array column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(count); |
| return Status::OK(); |
| } |
| |
| for (size_t i = 0; i < count; ++i) { |
| // TODO(cambyzju): now read array one by one, need optimize later |
| RETURN_IF_ERROR(seek_to_ordinal(rowids[i])); |
| size_t num_read = 1; |
| RETURN_IF_ERROR(next_batch(&num_read, dst, nullptr)); |
| DCHECK(num_read == 1); |
| } |
| return Status::OK(); |
| } |
| |
| void ArrayFileColumnIterator::set_need_to_read() { |
| set_reading_flag(ReadingFlag::NEED_TO_READ); |
| _item_iterator->set_need_to_read(); |
| } |
| |
| void ArrayFileColumnIterator::remove_pruned_sub_iterators() { |
| _item_iterator->remove_pruned_sub_iterators(); |
| } |
| |
| Status ArrayFileColumnIterator::set_access_paths(const TColumnAccessPaths& all_access_paths, |
| const TColumnAccessPaths& predicate_access_paths) { |
| if (all_access_paths.empty()) { |
| return Status::OK(); |
| } |
| |
| if (!predicate_access_paths.empty()) { |
| set_reading_flag(ReadingFlag::READING_FOR_PREDICATE); |
| DLOG(INFO) << "Array column iterator set sub-column " << _column_name |
| << " to READING_FOR_PREDICATE"; |
| } |
| |
| auto sub_all_access_paths = DORIS_TRY(_get_sub_access_paths(all_access_paths)); |
| auto sub_predicate_access_paths = DORIS_TRY(_get_sub_access_paths(predicate_access_paths)); |
| |
| const auto no_sub_column_to_skip = sub_all_access_paths.empty(); |
| const auto no_predicate_sub_column = sub_predicate_access_paths.empty(); |
| |
| if (!no_sub_column_to_skip) { |
| for (auto& path : sub_all_access_paths) { |
| if (path.data_access_path.path[0] == "*") { |
| path.data_access_path.path[0] = _item_iterator->column_name(); |
| } |
| } |
| } |
| |
| if (!no_predicate_sub_column) { |
| for (auto& path : sub_predicate_access_paths) { |
| if (path.data_access_path.path[0] == "*") { |
| path.data_access_path.path[0] = _item_iterator->column_name(); |
| } |
| } |
| } |
| |
| if (!no_sub_column_to_skip || !no_predicate_sub_column) { |
| _item_iterator->set_reading_flag(ReadingFlag::NEED_TO_READ); |
| RETURN_IF_ERROR( |
| _item_iterator->set_access_paths(sub_all_access_paths, sub_predicate_access_paths)); |
| } |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| FileColumnIterator::FileColumnIterator(std::shared_ptr<ColumnReader> reader) : _reader(reader) {} |
| |
| Status FileColumnIterator::init(const ColumnIteratorOptions& opts) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "File column iterator column " << _column_name << " skip reading."; |
| return Status::OK(); |
| } |
| |
| _opts = opts; |
| if (!_opts.use_page_cache) { |
| _reader->disable_index_meta_cache(); |
| } |
| RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec)); |
| if (config::enable_low_cardinality_optimize && |
| opts.io_ctx.reader_type == ReaderType::READER_QUERY && |
| _reader->encoding_info()->encoding() == DICT_ENCODING) { |
| auto dict_encoding_type = _reader->get_dict_encoding_type(); |
| // Only if the column is a predicate column, then we need check the all dict encoding flag |
| // because we could rewrite the predciate to accelarate query speed. But if it is not a |
| // predicate column, then it is useless. And it has a bad impact on cold read(first time read) |
| // because it will load the column's ordinal index and zonemap index and maybe other indices. |
| // it has bad impact on primary key query. For example, select * from table where pk = 1, and |
| // the table has 2000 columns. |
| if (dict_encoding_type == ColumnReader::UNKNOWN_DICT_ENCODING && opts.is_predicate_column) { |
| RETURN_IF_ERROR(seek_to_ordinal(_reader->num_rows() - 1)); |
| _is_all_dict_encoding = _page.is_dict_encoding; |
| _reader->set_dict_encoding_type(_is_all_dict_encoding |
| ? ColumnReader::ALL_DICT_ENCODING |
| : ColumnReader::PARTIAL_DICT_ENCODING); |
| } else { |
| _is_all_dict_encoding = dict_encoding_type == ColumnReader::ALL_DICT_ENCODING; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| FileColumnIterator::~FileColumnIterator() = default; |
| |
| Status FileColumnIterator::seek_to_ordinal(ordinal_t ord) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "File column iterator column " << _column_name << " skip reading."; |
| return Status::OK(); |
| } |
| |
| // if current page contains this row, we don't need to seek |
| if (!_page || !_page.contains(ord) || !_page_iter.valid()) { |
| RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter, _opts)); |
| RETURN_IF_ERROR(_read_data_page(_page_iter)); |
| } |
| RETURN_IF_ERROR(_seek_to_pos_in_page(&_page, ord - _page.first_ordinal)); |
| _current_ordinal = ord; |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::seek_to_page_start() { |
| return seek_to_ordinal(_page.first_ordinal); |
| } |
| |
| Status FileColumnIterator::_seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) const { |
| if (page->offset_in_page == offset_in_page) { |
| // fast path, do nothing |
| return Status::OK(); |
| } |
| |
| ordinal_t pos_in_data = offset_in_page; |
| if (_page.has_null) { |
| ordinal_t offset_in_data = 0; |
| ordinal_t skips = offset_in_page; |
| |
| if (offset_in_page > page->offset_in_page) { |
| // forward, reuse null bitmap |
| skips = offset_in_page - page->offset_in_page; |
| offset_in_data = page->data_decoder->current_index(); |
| } else { |
| // rewind null bitmap, and |
| page->null_decoder = RleDecoder<bool>((const uint8_t*)page->null_bitmap.data, |
| cast_set<int>(page->null_bitmap.size), 1); |
| } |
| |
| auto skip_nulls = page->null_decoder.Skip(skips); |
| pos_in_data = offset_in_data + skips - skip_nulls; |
| } |
| |
| RETURN_IF_ERROR(page->data_decoder->seek_to_position_in_page(pos_in_data)); |
| page->offset_in_page = offset_in_page; |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) { |
| return _reader->next_batch_of_zone_map(n, dst); |
| } |
| |
| Status FileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, |
| bool* has_null) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "File column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(*n); |
| return Status::OK(); |
| } |
| |
| size_t curr_size = dst->byte_size(); |
| dst->reserve(*n); |
| size_t remaining = *n; |
| *has_null = false; |
| while (remaining > 0) { |
| if (!_page.has_remaining()) { |
| bool eos = false; |
| RETURN_IF_ERROR(_load_next_page(&eos)); |
| if (eos) { |
| break; |
| } |
| } |
| |
| // number of rows to be read from this page |
| size_t nrows_in_page = std::min(remaining, _page.remaining()); |
| size_t nrows_to_read = nrows_in_page; |
| if (_page.has_null) { |
| while (nrows_to_read > 0) { |
| bool is_null = false; |
| size_t this_run = _page.null_decoder.GetNextRun(&is_null, nrows_to_read); |
| // we use num_rows only for CHECK |
| size_t num_rows = this_run; |
| if (!is_null) { |
| RETURN_IF_ERROR(_page.data_decoder->next_batch(&num_rows, dst)); |
| DCHECK_EQ(this_run, num_rows); |
| } else { |
| *has_null = true; |
| auto* null_col = |
| vectorized::check_and_get_column<vectorized::ColumnNullable>(dst.get()); |
| if (null_col != nullptr) { |
| null_col->insert_many_defaults(this_run); |
| } else { |
| return Status::InternalError("unexpected column type in column reader"); |
| } |
| } |
| |
| nrows_to_read -= this_run; |
| _page.offset_in_page += this_run; |
| _current_ordinal += this_run; |
| } |
| } else { |
| RETURN_IF_ERROR(_page.data_decoder->next_batch(&nrows_to_read, dst)); |
| DCHECK_EQ(nrows_to_read, nrows_in_page); |
| |
| _page.offset_in_page += nrows_to_read; |
| _current_ordinal += nrows_to_read; |
| } |
| remaining -= nrows_in_page; |
| } |
| *n -= remaining; |
| _opts.stats->bytes_read += (dst->byte_size() - curr_size) + BitmapSize(*n); |
| |
| #ifdef BE_TEST |
| _reader->check_data_by_zone_map_for_test(dst); |
| #endif |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count, |
| vectorized::MutableColumnPtr& dst) { |
| if (_reading_flag == ReadingFlag::SKIP_READING) { |
| DLOG(INFO) << "File column iterator column " << _column_name << " skip reading."; |
| dst->insert_many_defaults(count); |
| return Status::OK(); |
| } |
| |
| size_t remaining = count; |
| size_t total_read_count = 0; |
| size_t nrows_to_read = 0; |
| while (remaining > 0) { |
| RETURN_IF_ERROR(seek_to_ordinal(rowids[total_read_count])); |
| |
| // number of rows to be read from this page |
| nrows_to_read = std::min(remaining, _page.remaining()); |
| |
| if (_page.has_null) { |
| size_t already_read = 0; |
| while ((nrows_to_read - already_read) > 0) { |
| bool is_null = false; |
| size_t this_run = std::min(nrows_to_read - already_read, _page.remaining()); |
| if (UNLIKELY(this_run == 0)) { |
| break; |
| } |
| this_run = _page.null_decoder.GetNextRun(&is_null, this_run); |
| size_t offset = total_read_count + already_read; |
| size_t this_read_count = 0; |
| rowid_t current_ordinal_in_page = |
| cast_set<uint32_t>(_page.offset_in_page + _page.first_ordinal); |
| for (size_t i = 0; i < this_run; ++i) { |
| if (rowids[offset + i] - current_ordinal_in_page >= this_run) { |
| break; |
| } |
| this_read_count++; |
| } |
| |
| auto origin_index = _page.data_decoder->current_index(); |
| if (this_read_count > 0) { |
| if (is_null) { |
| auto* null_col = |
| vectorized::check_and_get_column<vectorized::ColumnNullable>( |
| dst.get()); |
| if (UNLIKELY(null_col == nullptr)) { |
| return Status::InternalError("unexpected column type in column reader"); |
| } |
| |
| null_col->insert_many_defaults(this_read_count); |
| } else { |
| size_t read_count = this_read_count; |
| |
| // ordinal in nullable columns' data buffer maybe be not continuously(the data doesn't contain null value), |
| // so we need use `page_start_off_in_decoder` to calculate the actual offset in `data_decoder` |
| size_t page_start_off_in_decoder = |
| _page.first_ordinal + _page.offset_in_page - origin_index; |
| RETURN_IF_ERROR(_page.data_decoder->read_by_rowids( |
| &rowids[offset], page_start_off_in_decoder, &read_count, dst)); |
| DCHECK_EQ(read_count, this_read_count); |
| } |
| } |
| |
| if (!is_null) { |
| RETURN_IF_ERROR( |
| _page.data_decoder->seek_to_position_in_page(origin_index + this_run)); |
| } |
| |
| already_read += this_read_count; |
| _page.offset_in_page += this_run; |
| DCHECK(_page.offset_in_page <= _page.num_rows); |
| } |
| |
| nrows_to_read = already_read; |
| total_read_count += nrows_to_read; |
| remaining -= nrows_to_read; |
| } else { |
| RETURN_IF_ERROR(_page.data_decoder->read_by_rowids( |
| &rowids[total_read_count], _page.first_ordinal, &nrows_to_read, dst)); |
| total_read_count += nrows_to_read; |
| remaining -= nrows_to_read; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::_load_next_page(bool* eos) { |
| _page_iter.next(); |
| if (!_page_iter.valid()) { |
| *eos = true; |
| return Status::OK(); |
| } |
| |
| RETURN_IF_ERROR(_read_data_page(_page_iter)); |
| RETURN_IF_ERROR(_seek_to_pos_in_page(&_page, 0)); |
| *eos = false; |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) { |
| PageHandle handle; |
| Slice page_body; |
| PageFooterPB footer; |
| _opts.type = DATA_PAGE; |
| RETURN_IF_ERROR( |
| _reader->read_page(_opts, iter.page(), &handle, &page_body, &footer, _compress_codec)); |
| // parse data page |
| RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(), |
| _reader->encoding_info(), iter.page(), iter.page_index(), |
| &_page)); |
| |
| // dictionary page is read when the first data page that uses it is read, |
| // this is to optimize the memory usage: when there is no query on one column, we could |
| // release the memory of dictionary page. |
| // note that concurrent iterators for the same column won't repeatedly read dictionary page |
| // because of page cache. |
| if (_reader->encoding_info()->encoding() == DICT_ENCODING) { |
| auto dict_page_decoder = reinterpret_cast<BinaryDictPageDecoder*>(_page.data_decoder.get()); |
| if (dict_page_decoder->is_dict_encoding()) { |
| if (_dict_decoder == nullptr) { |
| RETURN_IF_ERROR(_read_dict_data()); |
| CHECK_NOTNULL(_dict_decoder); |
| } |
| |
| dict_page_decoder->set_dict_decoder(cast_set<uint32_t>(_dict_decoder->count()), |
| _dict_word_info.get()); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::_read_dict_data() { |
| CHECK_EQ(_reader->encoding_info()->encoding(), DICT_ENCODING); |
| // read dictionary page |
| Slice dict_data; |
| PageFooterPB dict_footer; |
| _opts.type = INDEX_PAGE; |
| |
| RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(), &_dict_page_handle, |
| &dict_data, &dict_footer, _compress_codec, true)); |
| const EncodingInfo* encoding_info; |
| RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
| dict_footer.dict_page_footer().encoding(), {}, |
| &encoding_info)); |
| RETURN_IF_ERROR(encoding_info->create_page_decoder(dict_data, {}, _dict_decoder)); |
| RETURN_IF_ERROR(_dict_decoder->init()); |
| |
| _dict_word_info.reset(new StringRef[_dict_decoder->count()]); |
| RETURN_IF_ERROR(_dict_decoder->get_dict_word_info(_dict_word_info.get())); |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::get_row_ranges_by_zone_map( |
| const AndBlockColumnPredicate* col_predicates, |
| const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates, |
| RowRanges* row_ranges) { |
| if (_reader->has_zone_map()) { |
| RETURN_IF_ERROR(_reader->get_row_ranges_by_zone_map(col_predicates, delete_predicates, |
| row_ranges, _opts)); |
| } |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::get_row_ranges_by_bloom_filter( |
| const AndBlockColumnPredicate* col_predicates, RowRanges* row_ranges) { |
| if ((col_predicates->can_do_bloom_filter(false) && _reader->has_bloom_filter_index(false)) || |
| (col_predicates->can_do_bloom_filter(true) && _reader->has_bloom_filter_index(true))) { |
| RETURN_IF_ERROR(_reader->get_row_ranges_by_bloom_filter(col_predicates, row_ranges, _opts)); |
| } |
| return Status::OK(); |
| } |
| |
| Status FileColumnIterator::get_row_ranges_by_dict(const AndBlockColumnPredicate* col_predicates, |
| RowRanges* row_ranges) { |
| if (!_is_all_dict_encoding) { |
| return Status::OK(); |
| } |
| |
| if (!_dict_decoder) { |
| RETURN_IF_ERROR(_read_dict_data()); |
| CHECK_NOTNULL(_dict_decoder); |
| } |
| |
| if (!col_predicates->evaluate_and(_dict_word_info.get(), _dict_decoder->count())) { |
| row_ranges->clear(); |
| } |
| return Status::OK(); |
| } |
| |
| Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) { |
| _opts = opts; |
| // be consistent with segment v1 |
| // if _has_default_value, we should create default column iterator for this column, and |
| // "NULL" is a special default value which means the default value is null. |
| if (_has_default_value) { |
| if (_default_value == "NULL") { |
| _is_default_value_null = true; |
| } else { |
| _type_size = _type_info->size(); |
| _mem_value.resize(_type_size); |
| Status s = Status::OK(); |
| // If char length is 10, but default value is 'a' , it's length is 1 |
| // not fill 0 to the ending, because segment iterator will shrink the tail 0 char |
| if (_type_info->type() == FieldType::OLAP_FIELD_TYPE_VARCHAR || |
| _type_info->type() == FieldType::OLAP_FIELD_TYPE_HLL || |
| _type_info->type() == FieldType::OLAP_FIELD_TYPE_BITMAP || |
| _type_info->type() == FieldType::OLAP_FIELD_TYPE_STRING || |
| _type_info->type() == FieldType::OLAP_FIELD_TYPE_CHAR) { |
| ((Slice*)_mem_value.data())->size = _default_value.length(); |
| ((Slice*)_mem_value.data())->data = _default_value.data(); |
| } else if (_type_info->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
| if (_default_value != "[]") { |
| return Status::NotSupported("Array default {} is unsupported", _default_value); |
| } else { |
| ((Slice*)_mem_value.data())->size = _default_value.length(); |
| ((Slice*)_mem_value.data())->data = _default_value.data(); |
| } |
| } else if (_type_info->type() == FieldType::OLAP_FIELD_TYPE_STRUCT) { |
| return Status::NotSupported("STRUCT default type is unsupported"); |
| } else if (_type_info->type() == FieldType::OLAP_FIELD_TYPE_MAP) { |
| return Status::NotSupported("MAP default type is unsupported"); |
| } else { |
| s = _type_info->from_string(_mem_value.data(), _default_value, _precision, _scale); |
| } |
| if (!s.ok()) { |
| return s; |
| } |
| } |
| } else if (_is_nullable) { |
| // if _has_default_value is false but _is_nullable is true, we should return null as default value. |
| _is_default_value_null = true; |
| } else { |
| return Status::InternalError( |
| "invalid default value column for no default value and not nullable"); |
| } |
| return Status::OK(); |
| } |
| |
| void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info, size_t type_size, |
| void* mem_value, |
| vectorized::MutableColumnPtr& dst, size_t n) { |
| dst = dst->convert_to_predicate_column_if_dictionary(); |
| |
| switch (type_info->type()) { |
| case FieldType::OLAP_FIELD_TYPE_BITMAP: |
| case FieldType::OLAP_FIELD_TYPE_HLL: { |
| dst->insert_many_defaults(n); |
| break; |
| } |
| case FieldType::OLAP_FIELD_TYPE_DATE: { |
| vectorized::Int64 int64; |
| char* data_ptr = (char*)&int64; |
| size_t data_len = sizeof(int64); |
| |
| assert(type_size == |
| sizeof(FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_DATE>::CppType)); //uint24_t |
| std::string str = FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_DATE>::to_string(mem_value); |
| |
| VecDateTimeValue value; |
| value.from_date_str(str.c_str(), str.length()); |
| value.cast_to_date(); |
| |
| int64 = binary_cast<VecDateTimeValue, vectorized::Int64>(value); |
| dst->insert_data_repeatedly(data_ptr, data_len, n); |
| break; |
| } |
| case FieldType::OLAP_FIELD_TYPE_DATETIME: { |
| vectorized::Int64 int64; |
| char* data_ptr = (char*)&int64; |
| size_t data_len = sizeof(int64); |
| |
| assert(type_size == |
| sizeof(FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_DATETIME>::CppType)); //int64_t |
| std::string str = |
| FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_DATETIME>::to_string(mem_value); |
| |
| VecDateTimeValue value; |
| value.from_date_str(str.c_str(), str.length()); |
| value.to_datetime(); |
| |
| int64 = binary_cast<VecDateTimeValue, vectorized::Int64>(value); |
| dst->insert_data_repeatedly(data_ptr, data_len, n); |
| break; |
| } |
| case FieldType::OLAP_FIELD_TYPE_DECIMAL: { |
| vectorized::Int128 int128; |
| char* data_ptr = (char*)&int128; |
| size_t data_len = sizeof(int128); |
| |
| assert(type_size == |
| sizeof(FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_DECIMAL>::CppType)); //decimal12_t |
| decimal12_t* d = (decimal12_t*)mem_value; |
| int128 = DecimalV2Value(d->integer, d->fraction).value(); |
| dst->insert_data_repeatedly(data_ptr, data_len, n); |
| break; |
| } |
| case FieldType::OLAP_FIELD_TYPE_STRING: |
| case FieldType::OLAP_FIELD_TYPE_VARCHAR: |
| case FieldType::OLAP_FIELD_TYPE_CHAR: |
| case FieldType::OLAP_FIELD_TYPE_JSONB: |
| case FieldType::OLAP_FIELD_TYPE_AGG_STATE: { |
| char* data_ptr = ((Slice*)mem_value)->data; |
| size_t data_len = ((Slice*)mem_value)->size; |
| dst->insert_data_repeatedly(data_ptr, data_len, n); |
| break; |
| } |
| case FieldType::OLAP_FIELD_TYPE_ARRAY: { |
| if (dst->is_nullable()) { |
| static_cast<vectorized::ColumnNullable&>(*dst).insert_not_null_elements(n); |
| } else { |
| dst->insert_many_defaults(n); |
| } |
| break; |
| } |
| case FieldType::OLAP_FIELD_TYPE_VARIANT: { |
| dst->insert_many_defaults(n); |
| break; |
| } |
| default: { |
| char* data_ptr = (char*)mem_value; |
| size_t data_len = type_size; |
| dst->insert_data_repeatedly(data_ptr, data_len, n); |
| } |
| } |
| } |
| |
| Status DefaultValueColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, |
| bool* has_null) { |
| *has_null = _is_default_value_null; |
| _insert_many_default(dst, *n); |
| return Status::OK(); |
| } |
| |
| Status DefaultValueColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count, |
| vectorized::MutableColumnPtr& dst) { |
| _insert_many_default(dst, count); |
| return Status::OK(); |
| } |
| |
| void DefaultValueColumnIterator::_insert_many_default(vectorized::MutableColumnPtr& dst, size_t n) { |
| if (_is_default_value_null) { |
| dst->insert_many_defaults(n); |
| } else { |
| insert_default_data(_type_info.get(), _type_size, _mem_value.data(), dst, n); |
| } |
| } |
| |
| Status RowIdColumnIteratorV2::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, |
| bool* has_null) { |
| auto* string_column = |
| assert_cast<vectorized::ColumnString*, TypeCheckOnRelease::DISABLE>(dst.get()); |
| |
| for (uint32_t i = 0; i < *n; ++i) { |
| uint32_t row_id = _current_rowid + i; |
| GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id); |
| string_column->insert_data(reinterpret_cast<const char*>(&location), |
| sizeof(GlobalRowLoacationV2)); |
| } |
| _current_rowid += *n; |
| return Status::OK(); |
| } |
| |
| Status RowIdColumnIteratorV2::read_by_rowids(const rowid_t* rowids, const size_t count, |
| vectorized::MutableColumnPtr& dst) { |
| auto* string_column = assert_cast<vectorized::ColumnString*>(dst.get()); |
| |
| for (size_t i = 0; i < count; ++i) { |
| uint32_t row_id = rowids[i]; |
| GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id); |
| string_column->insert_data(reinterpret_cast<const char*>(&location), |
| sizeof(GlobalRowLoacationV2)); |
| } |
| return Status::OK(); |
| } |
| #include "common/compile_check_end.h" |
| |
| } // namespace doris::segment_v2 |