blob: 60327d3619445a2e4a0fc51f37f2d71b0001df86 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/indexed_column_reader.h"
#include <gen_cpp/segment_v2.pb.h>
#include <algorithm>
#include "common/status.h"
#include "io/io_common.h"
#include "olap/key_coder.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo
#include "olap/rowset/segment_v2/options.h"
#include "olap/rowset/segment_v2/page_decoder.h"
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/types.h"
#include "util/block_compression.h"
#include "util/bvar_helper.h"
namespace doris {
using namespace ErrorCode;
namespace segment_v2 {
static bvar::Adder<uint64_t> g_index_reader_bytes("doris_pk", "index_reader_bytes");
static bvar::Adder<uint64_t> g_index_reader_compressed_bytes("doris_pk",
"index_reader_compressed_bytes");
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_bytes_per_second(
"doris_pk", "index_reader_bytes_per_second", &g_index_reader_bytes, 60);
static bvar::Adder<uint64_t> g_index_reader_pages("doris_pk", "index_reader_pages");
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pages_per_second(
"doris_pk", "index_reader_pages_per_second", &g_index_reader_pages, 60);
static bvar::Adder<uint64_t> g_index_reader_cached_pages("doris_pk", "index_reader_cached_pages");
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_cached_pages_per_second(
"doris_pk", "index_reader_cached_pages_per_second", &g_index_reader_cached_pages, 60);
static bvar::Adder<uint64_t> g_index_reader_seek_count("doris_pk", "index_reader_seek_count");
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_seek_per_second(
"doris_pk", "index_reader_seek_per_second", &g_index_reader_seek_count, 60);
static bvar::Adder<uint64_t> g_index_reader_pk_pages("doris_pk", "index_reader_pk_pages");
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pk_bytes_per_second(
"doris_pk", "index_reader_pk_pages_per_second", &g_index_reader_pk_pages, 60);
int64_t IndexedColumnReader::get_metadata_size() const {
return sizeof(IndexedColumnReader) + _meta.ByteSizeLong();
}
Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
_use_page_cache = use_page_cache;
_kept_in_memory = kept_in_memory;
_type_info = get_scalar_type_info((FieldType)_meta.data_type());
if (_type_info == nullptr) {
return Status::NotSupported("unsupported typeinfo, type={}", _meta.data_type());
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info->type(), _meta.encoding(), {}, &_encoding_info));
_value_key_coder = get_key_coder(_type_info->type());
// read and parse ordinal index page when exists
if (_meta.has_ordinal_index_meta()) {
if (_meta.ordinal_index_meta().is_root_data_page()) {
_sole_data_page = PagePointer(_meta.ordinal_index_meta().root_page());
} else {
RETURN_IF_ERROR(load_index_page(_meta.ordinal_index_meta().root_page(),
&_ordinal_index_page_handle,
_ordinal_index_reader.get(), index_load_stats));
_has_index_page = true;
}
}
// read and parse value index page when exists
if (_meta.has_value_index_meta()) {
if (_meta.value_index_meta().is_root_data_page()) {
_sole_data_page = PagePointer(_meta.value_index_meta().root_page());
} else {
RETURN_IF_ERROR(load_index_page(_meta.value_index_meta().root_page(),
&_value_index_page_handle, _value_index_reader.get(),
index_load_stats));
_has_index_page = true;
}
}
_num_values = _meta.num_values();
update_metadata_size();
return Status::OK();
}
Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* handle,
IndexPageReader* reader,
OlapReaderStatistics* index_load_stats) {
Slice body;
PageFooterPB footer;
BlockCompressionCodec* local_compress_codec;
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec));
RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
local_compress_codec, false, index_load_stats));
RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
_mem_size += body.get_size();
return Status::OK();
}
Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body,
PageFooterPB* footer, PageTypePB type,
BlockCompressionCodec* codec, bool pre_decode,
OlapReaderStatistics* stats) const {
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
PageReadOptions opts(io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats});
opts.use_page_cache = _use_page_cache;
opts.kept_in_memory = _kept_in_memory;
opts.pre_decode = pre_decode;
opts.type = type;
opts.file_reader = _file_reader.get();
opts.page_pointer = pp;
opts.codec = codec;
opts.stats = stats_ptr;
opts.encoding_info = _encoding_info;
if (_is_pk_index) {
opts.type = PRIMARY_KEY_INDEX_PAGE;
}
auto st = PageIO::read_and_decompress_page(opts, handle, body, footer);
g_index_reader_compressed_bytes << pp.size;
g_index_reader_bytes << footer->uncompressed_size();
g_index_reader_pages << 1;
g_index_reader_cached_pages << tmp_stats.cached_pages_num;
return st;
}
IndexedColumnReader::~IndexedColumnReader() = default;
///////////////////////////////////////////////////////////////////////////////
Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
Status status;
// there is not init() for IndexedColumnIterator, so do it here
if (!_compress_codec) {
RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec));
}
PageHandle handle;
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec,
true, _stats));
// parse data page
// note that page_index is not used in IndexedColumnIterator, so we pass 0
PageDecoderOptions opts;
opts.need_check_bitmap = false;
status = ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
_reader->encoding_info(), pp, 0, &_data_page, opts);
DCHECK(_reader->_meta.ordinal_index_meta().is_root_data_page()
? _reader->_meta.num_values() == _data_page.num_rows
: true);
return status;
}
Status IndexedColumnIterator::seek_to_ordinal(ordinal_t idx) {
DCHECK(idx <= _reader->num_values());
if (!_reader->support_ordinal_seek()) {
return Status::NotSupported("no ordinal index");
}
// it's ok to seek past the last value
if (idx == _reader->num_values()) {
_current_ordinal = idx;
_seeked = true;
return Status::OK();
}
if (!_data_page || !_data_page.contains(idx)) {
// need to read the data page containing row at idx
if (_reader->_has_index_page) {
std::string key;
KeyCoderTraits<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>::full_encode_ascending(&idx,
&key);
RETURN_IF_ERROR(_ordinal_iter.seek_at_or_before(key));
RETURN_IF_ERROR(_read_data_page(_ordinal_iter.current_page_pointer()));
_current_iter = &_ordinal_iter;
} else {
RETURN_IF_ERROR(_read_data_page(_reader->_sole_data_page));
}
}
ordinal_t offset_in_page = idx - _data_page.first_ordinal;
RETURN_IF_ERROR(_data_page.data_decoder->seek_to_position_in_page(offset_in_page));
DCHECK(offset_in_page == _data_page.data_decoder->current_index());
_data_page.offset_in_page = offset_in_page;
_current_ordinal = idx;
_seeked = true;
return Status::OK();
}
Status IndexedColumnIterator::seek_at_or_after(const void* key, bool* exact_match) {
if (!_reader->support_value_seek()) {
return Status::NotSupported("no value index");
}
if (_reader->num_values() == 0) {
return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("value index is empty ");
}
g_index_reader_seek_count << 1;
bool load_data_page = false;
PagePointer data_page_pp;
if (_reader->_has_index_page) {
// seek index to determine the data page to seek
std::string encoded_key;
_reader->_value_key_coder->full_encode_ascending(key, &encoded_key);
Status st = _value_iter.seek_at_or_before(encoded_key);
if (st.is<ENTRY_NOT_FOUND>()) {
// all keys in page is greater than `encoded_key`, point to the first page.
// otherwise, we may missing some pages.
// For example, the predicate is `col1 > 2`, and the index page is [3,5,7].
// so the `seek_at_or_before(2)` will return Status::Error<ENTRY_NOT_FOUND>().
// But actually, we expect it to point to page `3`.
_value_iter.seek_to_first();
} else if (!st.ok()) {
return st;
}
data_page_pp = _value_iter.current_page_pointer();
_current_iter = &_value_iter;
if (!_data_page || _data_page.page_pointer != data_page_pp) {
// load when it's not the same with the current
load_data_page = true;
}
} else if (!_data_page) {
// no index page, load data page for the first time
load_data_page = true;
data_page_pp = PagePointer(_reader->_sole_data_page);
}
if (load_data_page) {
RETURN_IF_ERROR(_read_data_page(data_page_pp));
}
// seek inside data page
Status st = _data_page.data_decoder->seek_at_or_after_value(key, exact_match);
// return the first row of next page when not found
if (st.is<ENTRY_NOT_FOUND>() && _reader->_has_index_page) {
if (_value_iter.has_next()) {
_seeked = true;
*exact_match = false;
_current_ordinal = _data_page.first_ordinal + _data_page.num_rows;
// move offset to the end of the page
_data_page.offset_in_page = _data_page.num_rows;
return Status::OK();
}
}
RETURN_IF_ERROR(st);
_data_page.offset_in_page = _data_page.data_decoder->current_index();
_current_ordinal = _data_page.first_ordinal + _data_page.offset_in_page;
DCHECK(_data_page.contains(_current_ordinal));
_seeked = true;
return Status::OK();
}
Status IndexedColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
DCHECK(_seeked);
if (_current_ordinal == _reader->num_values()) {
*n = 0;
return Status::OK();
}
size_t remaining = *n;
while (remaining > 0) {
if (!_data_page.has_remaining()) {
// trying to read next data page
if (!_reader->_has_index_page) {
break; // no more data page
}
bool has_next = _current_iter->move_next();
if (!has_next) {
break; // no more data page
}
RETURN_IF_ERROR(_read_data_page(_current_iter->current_page_pointer()));
}
size_t rows_to_read = std::min(_data_page.remaining(), remaining);
size_t rows_read = rows_to_read;
RETURN_IF_ERROR(_data_page.data_decoder->next_batch(&rows_read, dst));
DCHECK(rows_to_read == rows_read);
_data_page.offset_in_page += rows_read;
_current_ordinal += rows_read;
remaining -= rows_read;
}
*n -= remaining;
_seeked = false;
return Status::OK();
}
} // namespace segment_v2
} // namespace doris