blob: 48fe50469cc2068bf1eeadbfca50b3ad0d69ebc2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parallel_scanner_builder.h"
#include <cstddef>
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/config.h"
#include "common/status.h"
#include "olap/base_tablet.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/segment_loader.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/olap_scanner.h"
namespace doris {
using namespace vectorized;
Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& scanners) {
RETURN_IF_ERROR(_load());
if (_scan_parallelism_by_segment) {
return _build_scanners_by_segment(scanners);
} else if (_is_dup_mow_key) {
// Default strategy for DUP/MOW tables: split by rowids within segments
return _build_scanners_by_rowid(scanners);
} else {
// TODO: support to split by key range
return Status::NotSupported("split by key range not supported yet.");
}
}
Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>& scanners) {
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_read_sources.contains(tablet->tablet_id()));
auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
if (config::is_cloud_mode()) {
// FIXME(plat1ko): Avoid pointer cast
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
}
// `rs_splits` in `entire read source` will be devided into several partitial read sources
// to build several parallel scanners, based on segment rows number. All the partitial read sources
// share the same delete predicates from their corresponding entire read source.
TabletReadSource partitial_read_source;
int64_t rows_collected = 0;
for (auto& rs_split : entire_read_source.rs_splits) {
auto reader = rs_split.rs_reader;
auto rowset = reader->rowset();
const auto rowset_id = rowset->rowset_id();
const auto& segments_rows = _all_segments_rows[rowset_id];
if (rowset->num_rows() == 0) {
continue;
}
int64_t segment_start = 0;
auto split = RowSetSplits(reader->clone());
for (size_t i = 0; i != segments_rows.size(); ++i) {
const size_t rows_of_segment = segments_rows[i];
RowRanges row_ranges;
int64_t offset_in_segment = 0;
// try to split large segments into RowRanges
while (offset_in_segment < rows_of_segment) {
const int64_t remaining_rows = rows_of_segment - offset_in_segment;
auto rows_need = _rows_per_scanner - rows_collected;
// 0.9: try to avoid splitting the segments into excessively small parts.
if (rows_need >= remaining_rows * 9 / 10) {
rows_need = remaining_rows;
}
DCHECK_LE(rows_need, remaining_rows);
// RowRange stands for range: [From, To), From is inclusive, To is exclusive.
row_ranges.add({offset_in_segment,
offset_in_segment + static_cast<int64_t>(rows_need)});
rows_collected += rows_need;
offset_in_segment += rows_need;
// If collected enough rows, build a new scanner
if (rows_collected >= _rows_per_scanner) {
split.segment_offsets.first = segment_start,
split.segment_offsets.second = i + 1;
split.segment_row_ranges.emplace_back(std::move(row_ranges));
DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first,
split.segment_row_ranges.size());
partitial_read_source.rs_splits.emplace_back(std::move(split));
scanners.emplace_back(_build_scanner(
tablet, version, _key_ranges,
{.rs_splits = std::move(partitial_read_source.rs_splits),
.delete_predicates = entire_read_source.delete_predicates,
.delete_bitmap = entire_read_source.delete_bitmap}));
partitial_read_source = {};
split = RowSetSplits(reader->clone());
row_ranges = RowRanges();
segment_start = offset_in_segment < rows_of_segment ? i : i + 1;
rows_collected = 0;
}
}
// The non-empty `row_ranges` means there are some rows left in this segment not added into `split`.
if (!row_ranges.is_empty()) {
DCHECK_GT(rows_collected, 0);
DCHECK_EQ(row_ranges.to(), rows_of_segment);
split.segment_row_ranges.emplace_back(std::move(row_ranges));
}
}
DCHECK_LE(rows_collected, _rows_per_scanner);
if (rows_collected > 0) {
split.segment_offsets.first = segment_start;
split.segment_offsets.second = segments_rows.size();
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
DCHECK_EQ(split.segment_row_ranges.size(),
split.segment_offsets.second - split.segment_offsets.first);
partitial_read_source.rs_splits.emplace_back(std::move(split));
}
} // end `for (auto& rowset : rowsets)`
DCHECK_LE(rows_collected, _rows_per_scanner);
if (rows_collected > 0) {
DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
#ifndef NDEBUG
for (auto& split : partitial_read_source.rs_splits) {
DCHECK(split.rs_reader != nullptr);
DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second);
DCHECK_EQ(split.segment_row_ranges.size(),
split.segment_offsets.second - split.segment_offsets.first);
}
#endif
scanners.emplace_back(
_build_scanner(tablet, version, _key_ranges,
{.rs_splits = std::move(partitial_read_source.rs_splits),
.delete_predicates = entire_read_source.delete_predicates,
.delete_bitmap = entire_read_source.delete_bitmap}));
}
}
return Status::OK();
}
// Build scanners so that each segment is exclusively scanned by a single scanner.
// This guarantees the number of scanners equals the number of segments across all rowsets
// for the involved tablets. It preserves delete predicates and key ranges, and clones
// RowsetReader per scanner to avoid sharing between scanners.
Status ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>& scanners) {
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_read_sources.contains(tablet->tablet_id()));
auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
if (config::is_cloud_mode()) {
// FIXME(plat1ko): Avoid pointer cast
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
}
// Collect segments into scanners based on rows count instead of one scanner per segment
TabletReadSource partitial_read_source;
int64_t rows_collected = 0;
for (auto& rs_split : entire_read_source.rs_splits) {
auto reader = rs_split.rs_reader;
auto rowset = reader->rowset();
const auto rowset_id = rowset->rowset_id();
const auto& segments_rows = _all_segments_rows[rowset_id];
if (segments_rows.empty() || rowset->num_rows() == 0) {
continue;
}
int64_t segment_start = 0;
auto split = RowSetSplits(reader->clone());
for (size_t i = 0; i < segments_rows.size(); ++i) {
const size_t rows_of_segment = segments_rows[i];
// Check if adding this segment would exceed rows_per_scanner
// 0.9: try to avoid splitting the segments into excessively small parts.
if (rows_collected > 0 && (rows_collected + rows_of_segment > _rows_per_scanner &&
rows_collected < _rows_per_scanner * 9 / 10)) {
// Create a new scanner with collected segments
split.segment_offsets.first = segment_start;
split.segment_offsets.second =
i; // Range is [segment_start, i), including all segments from segment_start to i-1
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
partitial_read_source.rs_splits.emplace_back(std::move(split));
scanners.emplace_back(_build_scanner(
tablet, version, _key_ranges,
{.rs_splits = std::move(partitial_read_source.rs_splits),
.delete_predicates = entire_read_source.delete_predicates,
.delete_bitmap = entire_read_source.delete_bitmap}));
// Reset for next scanner
partitial_read_source = {};
split = RowSetSplits(reader->clone());
segment_start = i;
rows_collected = 0;
}
// Add current segment to the current scanner
rows_collected += rows_of_segment;
}
// Add remaining segments in this rowset to a scanner
if (rows_collected > 0) {
split.segment_offsets.first = segment_start;
split.segment_offsets.second = segments_rows.size();
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
partitial_read_source.rs_splits.emplace_back(std::move(split));
}
}
// Add remaining segments across all rowsets to a scanner
if (rows_collected > 0) {
DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
#ifndef NDEBUG
for (auto& split : partitial_read_source.rs_splits) {
DCHECK(split.rs_reader != nullptr);
DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second);
}
#endif
scanners.emplace_back(
_build_scanner(tablet, version, _key_ranges,
{.rs_splits = std::move(partitial_read_source.rs_splits),
.delete_predicates = entire_read_source.delete_predicates,
.delete_bitmap = entire_read_source.delete_bitmap}));
}
}
return Status::OK();
}
/**
* Load rowsets of each tablet with specified version, segments of each rowset.
*/
Status ParallelScannerBuilder::_load() {
_total_rows = 0;
size_t idx = 0;
for (auto&& [tablet, version] : _tablets) {
const auto tablet_id = tablet->tablet_id();
_all_read_sources[tablet_id] = _read_sources[idx];
const auto& read_source = _all_read_sources[tablet_id];
for (auto& rs_split : read_source.rs_splits) {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
std::vector<uint32_t> segment_rows;
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, &_builder_stats));
auto segment_count = rowset->num_segments();
for (int64_t i = 0; i != segment_count; i++) {
_all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
}
_total_rows += rowset->num_rows();
}
idx++;
}
_rows_per_scanner = _total_rows / _max_scanners_count;
_rows_per_scanner = std::max<size_t>(_rows_per_scanner, _min_rows_per_scanner);
return Status::OK();
}
std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges,
TabletReadSource&& read_source) {
OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet),
version, std::move(read_source), _limit, _is_preaggregation};
return OlapScanner::create_shared(_parent, std::move(params));
}
} // namespace doris