blob: 57a4bc4d74ae1d39fe43013082da04f34d80a8e1 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/segment_v2.pb.h>
#include <sys/types.h>
#include <cstddef> // for size_t
#include <cstdint> // for uint32_t
#include <memory> // for unique_ptr
#include <string>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h" // for Status
#include "core/column/column_array.h" // ColumnArray
#include "core/data_type/data_type.h"
#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/io_common.h"
#include "storage/index/index_reader.h"
#include "storage/index/ordinal_page_index.h" // for OrdinalPageIndexIterator
#include "storage/index/zone_map/zone_map_index.h"
#include "storage/olap_common.h"
#include "storage/predicate/column_predicate.h"
#include "storage/segment/common.h"
#include "storage/segment/page_handle.h" // for PageHandle
#include "storage/segment/page_pointer.h"
#include "storage/segment/parsed_page.h" // for ParsedPage
#include "storage/segment/segment_prefetcher.h"
#include "storage/segment/stream_reader.h"
#include "storage/tablet/tablet_schema.h"
#include "storage/types.h"
#include "storage/utils.h"
#include "util/once.h"
namespace doris {
#include "common/compile_check_begin.h"
class BlockCompressionCodec;
class AndBlockColumnPredicate;
class ColumnPredicate;
class TabletIndex;
class StorageReadOptions;
namespace io {
class FileReader;
} // namespace io
struct Slice;
struct StringRef;
using TColumnAccessPaths = std::vector<TColumnAccessPath>;
namespace segment_v2 {
class EncodingInfo;
class ColumnIterator;
class BloomFilterIndexReader;
class InvertedIndexIterator;
class InvertedIndexReader;
class IndexFileReader;
class PageDecoder;
class RowRanges;
class ZoneMapIndexReader;
class IndexIterator;
class ColumnMetaAccessor;
struct ColumnReaderOptions {
// whether verify checksum when read page
bool verify_checksum = true;
// for in memory olap table, use DURABLE CachePriority in page cache
bool kept_in_memory = false;
int be_exec_version = -1;
TabletSchemaSPtr tablet_schema = nullptr;
};
struct ColumnIteratorOptions {
bool use_page_cache = false;
bool is_predicate_column = false;
// for page cache allocation
// page types are divided into DATA_PAGE & INDEX_PAGE
// INDEX_PAGE including index_page, dict_page and short_key_page
PageTypePB type = PageTypePB::UNKNOWN_PAGE_TYPE;
io::FileReader* file_reader = nullptr; // Ref
// reader statistics
OlapReaderStatistics* stats = nullptr; // Ref
io::IOContext io_ctx;
void sanity_check() const {
CHECK_NOTNULL(file_reader);
CHECK_NOTNULL(stats);
}
};
class ColumnIterator;
class OffsetFileColumnIterator;
class FileColumnIterator;
using ColumnIteratorUPtr = std::unique_ptr<ColumnIterator>;
using OffsetFileColumnIteratorUPtr = std::unique_ptr<OffsetFileColumnIterator>;
using FileColumnIteratorUPtr = std::unique_ptr<FileColumnIterator>;
using ColumnIteratorSPtr = std::shared_ptr<ColumnIterator>;
// There will be concurrent users to read the same column. So
// we should do our best to reduce resource usage through share
// same information, such as OrdinalPageIndex and Page data.
// This will cache data shared by all reader
class ColumnReader : public MetadataAdder<ColumnReader>,
public std::enable_shared_from_this<ColumnReader> {
public:
ColumnReader();
// Create an initialized ColumnReader in *reader.
// This should be a lightweight operation without I/O.
static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::shared_ptr<ColumnReader>* reader);
static Status create_array(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
const io::FileReaderSPtr& file_reader,
std::shared_ptr<ColumnReader>* reader);
static Status create_map(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
const io::FileReaderSPtr& file_reader,
std::shared_ptr<ColumnReader>* reader);
static Status create_struct(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::shared_ptr<ColumnReader>* reader);
static Status create_agg_state(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::shared_ptr<ColumnReader>* reader);
enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, ALL_DICT_ENCODING };
static bool is_compaction_reader_type(ReaderType type);
~ColumnReader() override;
// create a new column iterator. Client should delete returned iterator
virtual Status new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* col,
const StorageReadOptions*);
Status new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
Status new_array_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
Status new_struct_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
Status new_map_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
Status new_agg_state_iterator(ColumnIteratorUPtr* iterator);
Status new_index_iterator(const std::shared_ptr<IndexFileReader>& index_file_reader,
const TabletIndex* index_meta,
std::unique_ptr<IndexIterator>* iterator);
Status seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter,
const ColumnIteratorOptions& iter_opts);
Status get_ordinal_index_reader(OrdinalIndexReader*& reader,
OlapReaderStatistics* index_load_stats);
// read a page from file into a page handle
Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
BlockCompressionCodec* codec, bool is_dict_page = false) const;
bool is_nullable() const { return _meta_is_nullable; }
const EncodingInfo* encoding_info() const { return _encoding_info; }
bool has_zone_map() const { return _zone_map_index != nullptr; }
bool has_bloom_filter_index(bool ngram) const;
// Check if this column could match `cond' using segment zone map.
// Since segment zone map is stored in metadata, this function is fast without I/O.
// set matched to true if segment zone map is absent or `cond' could be satisfied, false otherwise.
Status match_condition(const AndBlockColumnPredicate* col_predicates, bool* matched) const;
Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) const;
// get row ranges with zone map
// - cond_column is user's query predicate
// - delete_condition is a delete predicate of one version
Status get_row_ranges_by_zone_map(
const AndBlockColumnPredicate* col_predicates,
const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
RowRanges* row_ranges, const ColumnIteratorOptions& iter_opts);
// get row ranges with bloom filter index
Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
RowRanges* row_ranges,
const ColumnIteratorOptions& iter_opts);
PagePointer get_dict_page_pointer() const { return _meta_dict_page; }
bool is_empty() const { return _num_rows == 0; }
Status prune_predicates_by_zone_map(std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
const int column_id, bool* pruned) const;
CompressionTypePB get_compression() const { return _meta_compression; }
uint64_t num_rows() const { return _num_rows; }
void set_dict_encoding_type(DictEncodingType type) {
static_cast<void>(_set_dict_encoding_type_once.call([&] {
_dict_encoding_type = type;
return Status::OK();
}));
}
DictEncodingType get_dict_encoding_type() { return _dict_encoding_type; }
void disable_index_meta_cache() { _use_index_page_cache = false; }
DataTypePtr get_vec_data_type() { return _data_type; }
virtual FieldType get_meta_type() { return _meta_type; }
int64_t get_metadata_size() const override;
#ifdef BE_TEST
void check_data_by_zone_map_for_test(const MutableColumnPtr& dst) const;
#endif
private:
friend class VariantColumnReader;
friend class FileColumnIterator;
friend class SegmentPrefetcher;
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
io::FileReaderSPtr file_reader);
Status init(const ColumnMetaPB* meta);
[[nodiscard]] Status _load_zone_map_index(bool use_page_cache, bool kept_in_memory,
const ColumnIteratorOptions& iter_opts);
[[nodiscard]] Status _load_ordinal_index(bool use_page_cache, bool kept_in_memory,
const ColumnIteratorOptions& iter_opts);
[[nodiscard]] Status _load_index(const std::shared_ptr<IndexFileReader>& index_file_reader,
const TabletIndex* index_meta);
[[nodiscard]] Status _load_bloom_filter_index(bool use_page_cache, bool kept_in_memory,
const ColumnIteratorOptions& iter_opts);
bool _zone_map_match_condition(const segment_v2::ZoneMap& zone_map,
const AndBlockColumnPredicate* col_predicates) const;
Status _get_filtered_pages(
const AndBlockColumnPredicate* col_predicates,
const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
std::vector<uint32_t>* page_indexes, const ColumnIteratorOptions& iter_opts);
Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges,
const ColumnIteratorOptions& iter_opts);
int64_t _meta_length;
FieldType _meta_type;
FieldType _meta_children_column_type;
bool _meta_is_nullable;
bool _use_index_page_cache;
int _be_exec_version = -1;
PagePointer _meta_dict_page;
CompressionTypePB _meta_compression;
ColumnReaderOptions _opts;
uint64_t _num_rows;
io::FileReaderSPtr _file_reader;
DictEncodingType _dict_encoding_type;
DataTypePtr _data_type;
TypeInfoPtr _type_info =
TypeInfoPtr(nullptr,
nullptr); // initialized in init(), may changed by subclasses.
const EncodingInfo* _encoding_info =
nullptr; // initialized in init(), used for create PageDecoder
// meta for various column indexes (null if the index is absent)
std::unique_ptr<ZoneMapPB> _segment_zone_map;
mutable std::shared_mutex _load_index_lock;
std::unique_ptr<ZoneMapIndexReader> _zone_map_index;
std::unique_ptr<OrdinalIndexReader> _ordinal_index;
std::shared_ptr<BloomFilterIndexReader> _bloom_filter_index;
std::unordered_map<int64_t, IndexReaderPtr> _index_readers;
std::vector<std::shared_ptr<ColumnReader>> _sub_readers;
DorisCallOnce<Status> _set_dict_encoding_type_once;
};
// Base iterator to read one column data
class ColumnIterator {
public:
ColumnIterator() = default;
virtual ~ColumnIterator() = default;
virtual Status init(const ColumnIteratorOptions& opts) {
_opts = opts;
return Status::OK();
}
// Seek to the given ordinal entry in the column.
// Entry 0 is the first entry written to the column.
// If provided seek point is past the end of the file,
// then returns false.
virtual Status seek_to_ordinal(ordinal_t ord) = 0;
Status next_batch(size_t* n, MutableColumnPtr& dst) {
bool has_null;
return next_batch(n, dst, &has_null);
}
virtual Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) {
return Status::NotSupported("next_batch not implement");
}
virtual Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) {
return Status::NotSupported("next_batch_of_zone_map not implement");
}
virtual Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) {
return Status::NotSupported("read_by_rowids not implement");
}
virtual ordinal_t get_current_ordinal() const = 0;
virtual Status get_row_ranges_by_zone_map(
const AndBlockColumnPredicate* col_predicates,
const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
RowRanges* row_ranges) {
return Status::OK();
}
virtual Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
RowRanges* row_ranges) {
return Status::OK();
}
virtual Status get_row_ranges_by_dict(const AndBlockColumnPredicate* col_predicates,
RowRanges* row_ranges) {
return Status::OK();
}
virtual bool is_all_dict_encoding() const { return false; }
virtual Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) {
if (!predicate_access_paths.empty()) {
_reading_flag = ReadingFlag::READING_FOR_PREDICATE;
}
return Status::OK();
}
void set_column_name(const std::string& column_name) { _column_name = column_name; }
const std::string& column_name() const { return _column_name; }
// Since there may be multiple paths with conflicts or overlaps,
// we need to define several reading flags:
//
// NORMAL_READING — Default value, indicating that the column should be read.
// SKIP_READING — The column should not be read.
// NEED_TO_READ — The column must be read.
// READING_FOR_PREDICATE — The column is required for predicate evaluation.
//
// For example, suppose there are two paths:
// - Path 1 specifies that column A needs to be read, so it is marked as NEED_TO_READ.
// - Path 2 specifies that the column should not be read, but since it is already marked as NEED_TO_READ,
// it should not be changed to SKIP_READING.
enum class ReadingFlag : int {
NORMAL_READING,
SKIP_READING,
NEED_TO_READ,
READING_FOR_PREDICATE
};
void set_reading_flag(ReadingFlag flag) {
if (static_cast<int>(flag) > static_cast<int>(_reading_flag)) {
_reading_flag = flag;
}
}
ReadingFlag reading_flag() const { return _reading_flag; }
virtual void set_need_to_read() { set_reading_flag(ReadingFlag::NEED_TO_READ); }
virtual void remove_pruned_sub_iterators() {};
virtual Status init_prefetcher(const SegmentPrefetchParams& params) { return Status::OK(); }
virtual void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) {}
protected:
Result<TColumnAccessPaths> _get_sub_access_paths(const TColumnAccessPaths& access_paths);
ColumnIteratorOptions _opts;
ReadingFlag _reading_flag {ReadingFlag::NORMAL_READING};
std::string _column_name;
};
// This iterator is used to read column data from file
// for scalar type
class FileColumnIterator final : public ColumnIterator {
public:
explicit FileColumnIterator(std::shared_ptr<ColumnReader> reader);
~FileColumnIterator() override;
Status init(const ColumnIteratorOptions& opts) override;
Status seek_to_ordinal(ordinal_t ord) override;
Status seek_to_page_start();
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) override;
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override;
ordinal_t get_current_ordinal() const override { return _current_ordinal; }
// get row ranges by zone map
// - cond_column is user's query predicate
// - delete_condition is delete predicate of one version
Status get_row_ranges_by_zone_map(
const AndBlockColumnPredicate* col_predicates,
const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
RowRanges* row_ranges) override;
Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
RowRanges* row_ranges) override;
Status get_row_ranges_by_dict(const AndBlockColumnPredicate* col_predicates,
RowRanges* row_ranges) override;
ParsedPage* get_current_page() { return &_page; }
bool is_nullable() { return _reader->is_nullable(); }
bool is_all_dict_encoding() const override { return _is_all_dict_encoding; }
Status init_prefetcher(const SegmentPrefetchParams& params) override;
void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;
private:
Status _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) const;
Status _load_next_page(bool* eos);
Status _read_data_page(const OrdinalPageIndexIterator& iter);
Status _read_dict_data();
void _trigger_prefetch_if_eligible(ordinal_t ord);
std::shared_ptr<ColumnReader> _reader = nullptr;
// iterator owned compress codec, should NOT be shared by threads, initialized in init()
BlockCompressionCodec* _compress_codec = nullptr;
// 1. The _page represents current page.
// 2. We define an operation is one seek and following read,
// If new seek is issued, the _page will be reset.
ParsedPage _page;
// keep dict page decoder
std::unique_ptr<PageDecoder> _dict_decoder;
// keep dict page handle to avoid released
PageHandle _dict_page_handle;
// page iterator used to get next page when current page is finished.
// This value will be reset when a new seek is issued
OrdinalPageIndexIterator _page_iter;
// current value ordinal
ordinal_t _current_ordinal = 0;
bool _is_all_dict_encoding = false;
std::unique_ptr<StringRef[]> _dict_word_info;
bool _enable_prefetch {false};
std::unique_ptr<SegmentPrefetcher> _prefetcher;
std::shared_ptr<io::CachedRemoteFileReader> _cached_remote_file_reader {nullptr};
};
class EmptyFileColumnIterator final : public ColumnIterator {
public:
Status seek_to_ordinal(ordinal_t ord) override { return Status::OK(); }
ordinal_t get_current_ordinal() const override { return 0; }
};
// This iterator make offset operation write once for
class OffsetFileColumnIterator final : public ColumnIterator {
public:
explicit OffsetFileColumnIterator(FileColumnIteratorUPtr offset_reader) {
_offset_iterator = std::move(offset_reader);
}
~OffsetFileColumnIterator() override = default;
Status init(const ColumnIteratorOptions& opts) override;
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
ordinal_t get_current_ordinal() const override {
return _offset_iterator->get_current_ordinal();
}
Status seek_to_ordinal(ordinal_t ord) override {
RETURN_IF_ERROR(_offset_iterator->seek_to_ordinal(ord));
return Status::OK();
}
Status _peek_one_offset(ordinal_t* offset);
Status _calculate_offsets(ssize_t start, ColumnArray::ColumnOffsets& column_offsets);
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override {
return _offset_iterator->read_by_rowids(rowids, count, dst);
}
Status init_prefetcher(const SegmentPrefetchParams& params) override;
void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;
private:
std::unique_ptr<FileColumnIterator> _offset_iterator;
// reuse a tiny column for peek to avoid frequent allocations
MutableColumnPtr _peek_tmp_col;
};
// This iterator is used to read map value column
class MapFileColumnIterator final : public ColumnIterator {
public:
explicit MapFileColumnIterator(std::shared_ptr<ColumnReader> reader,
ColumnIteratorUPtr null_iterator,
OffsetFileColumnIteratorUPtr offsets_iterator,
ColumnIteratorUPtr key_iterator,
ColumnIteratorUPtr val_iterator);
~MapFileColumnIterator() override = default;
Status init(const ColumnIteratorOptions& opts) override;
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override;
Status seek_to_ordinal(ordinal_t ord) override;
ordinal_t get_current_ordinal() const override {
return _offsets_iterator->get_current_ordinal();
}
Status init_prefetcher(const SegmentPrefetchParams& params) override;
void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;
Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) override;
void set_need_to_read() override;
void remove_pruned_sub_iterators() override;
private:
std::shared_ptr<ColumnReader> _map_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
OffsetFileColumnIteratorUPtr _offsets_iterator; //OffsetFileIterator
ColumnIteratorUPtr _key_iterator;
ColumnIteratorUPtr _val_iterator;
};
class StructFileColumnIterator final : public ColumnIterator {
public:
explicit StructFileColumnIterator(std::shared_ptr<ColumnReader> reader,
ColumnIteratorUPtr null_iterator,
std::vector<ColumnIteratorUPtr>&& sub_column_iterators);
~StructFileColumnIterator() override = default;
Status init(const ColumnIteratorOptions& opts) override;
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override;
Status seek_to_ordinal(ordinal_t ord) override;
ordinal_t get_current_ordinal() const override {
return _sub_column_iterators[0]->get_current_ordinal();
}
Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) override;
void set_need_to_read() override;
void remove_pruned_sub_iterators() override;
Status init_prefetcher(const SegmentPrefetchParams& params) override;
void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;
private:
std::shared_ptr<ColumnReader> _struct_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
std::vector<ColumnIteratorUPtr> _sub_column_iterators;
};
class ArrayFileColumnIterator final : public ColumnIterator {
public:
explicit ArrayFileColumnIterator(std::shared_ptr<ColumnReader> reader,
OffsetFileColumnIteratorUPtr offset_reader,
ColumnIteratorUPtr item_iterator,
ColumnIteratorUPtr null_iterator);
~ArrayFileColumnIterator() override = default;
Status init(const ColumnIteratorOptions& opts) override;
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override;
Status seek_to_ordinal(ordinal_t ord) override;
ordinal_t get_current_ordinal() const override {
return _offset_iterator->get_current_ordinal();
}
Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) override;
void set_need_to_read() override;
void remove_pruned_sub_iterators() override;
Status init_prefetcher(const SegmentPrefetchParams& params) override;
void collect_prefetchers(
std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
PrefetcherInitMethod init_method) override;
private:
std::shared_ptr<ColumnReader> _array_reader = nullptr;
std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
std::unique_ptr<ColumnIterator> _null_iterator;
std::unique_ptr<ColumnIterator> _item_iterator;
Status _seek_by_offsets(ordinal_t ord);
};
class RowIdColumnIterator : public ColumnIterator {
public:
RowIdColumnIterator() = delete;
RowIdColumnIterator(int64_t tid, RowsetId rid, int32_t segid)
: _tablet_id(tid), _rowset_id(rid), _segment_id(segid) {}
Status seek_to_ordinal(ordinal_t ord_idx) override {
_current_rowid = cast_set<uint32_t>(ord_idx);
return Status::OK();
}
Status next_batch(size_t* n, MutableColumnPtr& dst) {
bool has_null;
return next_batch(n, dst, &has_null);
}
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override {
for (size_t i = 0; i < *n; ++i) {
const auto row_id = cast_set<uint32_t>(_current_rowid + i);
GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id);
dst->insert_data(reinterpret_cast<const char*>(&location), sizeof(GlobalRowLoacation));
}
_current_rowid += *n;
return Status::OK();
}
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override {
for (size_t i = 0; i < count; ++i) {
rowid_t row_id = rowids[i];
GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id);
dst->insert_data(reinterpret_cast<const char*>(&location), sizeof(GlobalRowLoacation));
}
return Status::OK();
}
ordinal_t get_current_ordinal() const override { return _current_rowid; }
private:
rowid_t _current_rowid = 0;
int64_t _tablet_id = 0;
RowsetId _rowset_id;
int32_t _segment_id = 0;
};
// Add new RowIdColumnIteratorV2
class RowIdColumnIteratorV2 : public ColumnIterator {
public:
RowIdColumnIteratorV2(uint8_t version, int64_t backend_id, uint32_t file_id)
: _version(version), _backend_id(backend_id), _file_id(file_id) {}
Status seek_to_ordinal(ordinal_t ord_idx) override {
_current_rowid = cast_set<uint32_t>(ord_idx);
return Status::OK();
}
Status next_batch(size_t* n, MutableColumnPtr& dst) {
bool has_null;
return next_batch(n, dst, &has_null);
}
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override;
ordinal_t get_current_ordinal() const override { return _current_rowid; }
private:
uint32_t _current_rowid = 0;
uint8_t _version;
int64_t _backend_id;
uint32_t _file_id;
};
// This iterator is used to read default value column
class DefaultValueColumnIterator : public ColumnIterator {
public:
DefaultValueColumnIterator(bool has_default_value, std::string default_value, bool is_nullable,
TypeInfoPtr type_info, int precision, int scale, int len)
: _has_default_value(has_default_value),
_default_value(std::move(default_value)),
_is_nullable(is_nullable),
_type_info(std::move(type_info)),
_precision(precision),
_scale(scale),
_len(len) {}
Status init(const ColumnIteratorOptions& opts) override;
Status seek_to_ordinal(ordinal_t ord_idx) override {
_current_rowid = ord_idx;
return Status::OK();
}
Status next_batch(size_t* n, MutableColumnPtr& dst) {
bool has_null;
return next_batch(n, dst, &has_null);
}
Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) override {
return next_batch(n, dst);
}
Status read_by_rowids(const rowid_t* rowids, const size_t count,
MutableColumnPtr& dst) override;
ordinal_t get_current_ordinal() const override { return _current_rowid; }
private:
void _insert_many_default(MutableColumnPtr& dst, size_t n);
bool _has_default_value;
std::string _default_value;
bool _is_nullable;
TypeInfoPtr _type_info;
int _precision;
int _scale;
const int _len;
Field _default_value_field;
// current rowid
ordinal_t _current_rowid = 0;
};
} // namespace segment_v2
#include "common/compile_check_end.h"
} // namespace doris