blob: 5fee34119ac1365220c694d0b8749a74c5e9a6df [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_cumulative_compaction_policy.h"
#include <algorithm>
#include <list>
#include <ostream>
#include <string>
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "util/defer_op.h"
namespace doris {
#include "common/compile_check_begin.h"
CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
int64_t compaction_min_size)
: _promotion_size(promotion_size),
_promotion_ratio(promotion_ratio),
_promotion_min_size(promotion_min_size),
_compaction_min_size(compaction_min_size) {}
int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
if (size < 1024) return 0;
int64_t max_level = (int64_t)1
<< (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2));
if (size >= max_level) return max_level;
return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
}
void find_longest_consecutive_empty_rowsets(std::vector<RowsetSharedPtr>* result,
const std::vector<RowsetSharedPtr>& candidate_rowsets) {
std::vector<RowsetSharedPtr> current_sequence;
std::vector<RowsetSharedPtr> longest_sequence;
for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
auto& rowset = candidate_rowsets[i];
// Check if rowset is empty and has no delete predicate
if (rowset->num_segments() == 0 && !rowset->rowset_meta()->has_delete_predicate()) {
// Check if this is consecutive with previous rowset
if (current_sequence.empty() ||
(current_sequence.back()->end_version() == rowset->start_version() - 1)) {
current_sequence.push_back(rowset);
} else {
// Start new sequence if not consecutive
if (current_sequence.size() > longest_sequence.size()) {
longest_sequence = current_sequence;
}
current_sequence.clear();
current_sequence.push_back(rowset);
}
} else {
// Non-empty rowset, check if we have a sequence to compare
if (current_sequence.size() > longest_sequence.size()) {
longest_sequence = current_sequence;
}
current_sequence.clear();
}
}
// Check final sequence
if (current_sequence.size() > longest_sequence.size()) {
longest_sequence = current_sequence;
}
*result = longest_sequence;
}
int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
DBUG_EXECUTE_IF(
"CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", {
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
if (target_tablet_id == tablet->tablet_id()) {
auto start_version = dp->param<int64_t>("start_version", -1);
auto end_version = dp->param<int64_t>("end_version", -1);
for (auto& rowset : candidate_rowsets) {
if (rowset->start_version() >= start_version &&
rowset->end_version() <= end_version) {
input_rowsets->push_back(rowset);
}
}
LOG_INFO(
"[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_"
"input_rowsets] tablet_id={}, start={}, end={}, "
"input_rowsets->size()={}",
target_tablet_id, start_version, end_version, input_rowsets->size());
return input_rowsets->size();
}
})
size_t promotion_size = cloud_promotion_size(tablet);
auto max_version = tablet->max_version().first;
int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;
bool skip_trim = false; // Skip trim for Empty Rowset Compaction
// DEFER: trim input_rowsets from back if score > max_compaction_score
// This ensures we don't return more rowsets than allowed by max_compaction_score,
// while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
// Must be placed after variable initialization and before collection loop.
DEFER({
if (skip_trim) {
return;
}
// Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
while (input_rowsets->size() > 1 &&
*compaction_score > static_cast<size_t>(max_compaction_score)) {
auto& last_rowset = input_rowsets->back();
*compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
total_size -= last_rowset->rowset_meta()->total_disk_size();
input_rowsets->pop_back();
}
});
for (auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
*compaction_score = 0;
transient_size = 0;
continue;
}
}
if (tablet->tablet_state() == TABLET_NOTREADY) {
// If tablet under alter, keep latest 10 version so that base tablet max version
// not merged in new tablet, and then we can copy data from base tablet
if (rowset->version().second < max_version - 10) {
continue;
}
}
// Removed: max_compaction_score check here
// We now collect all candidate rowsets and trim from back at return time via DEFER
*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();
transient_size += 1;
input_rowsets->push_back(rowset);
}
if (total_size >= promotion_size) {
return transient_size;
}
// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
if (input_rowsets->size() == 1) {
auto rs_meta = input_rowsets->front()->rowset_meta();
// if there is only one rowset and not overlapping,
// we do not need to do cumulative compaction
if (!rs_meta->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
}
return transient_size;
}
// Check if empty rowset compaction strategy is enabled
if (config::enable_empty_rowset_compaction && !input_rowsets->empty()) {
// Check if input_rowsets contain consecutive empty rowsets that meet criteria
std::vector<RowsetSharedPtr> consecutive_empty_rowsets;
find_longest_consecutive_empty_rowsets(&consecutive_empty_rowsets, *input_rowsets);
if (!consecutive_empty_rowsets.empty() &&
consecutive_empty_rowsets.size() >= config::empty_rowset_compaction_min_count &&
static_cast<double>(consecutive_empty_rowsets.size()) /
static_cast<double>(input_rowsets->size()) >=
config::empty_rowset_compaction_min_ratio) {
// Prioritize consecutive empty rowset compaction
// Skip trim: empty rowset compaction has very low cost and the goal is to reduce rowset count
*input_rowsets = consecutive_empty_rowsets;
*compaction_score = consecutive_empty_rowsets.size();
skip_trim = true;
return consecutive_empty_rowsets.size();
}
}
auto rs_begin = input_rowsets->begin();
size_t new_compaction_score = *compaction_score;
while (rs_begin != input_rowsets->end()) {
auto& rs_meta = (*rs_begin)->rowset_meta();
int64_t current_level = _level_size(rs_meta->total_disk_size());
int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
// if current level less then remain level, input rowsets contain current rowset
// and process return; otherwise, input rowsets do not contain current rowset.
if (current_level <= remain_level) {
break;
}
total_size -= rs_meta->total_disk_size();
new_compaction_score -= rs_meta->get_compaction_score();
++rs_begin;
}
if (rs_begin == input_rowsets->end()) { // No suitable level size found in `input_rowsets`
if (config::prioritize_query_perf_in_compaction && tablet->keys_type() != DUP_KEYS) {
// While tablet's key type is not `DUP_KEYS`, compacting rowset in such tablets has a significant
// positive impact on queries and reduces space amplification, so we ignore level limitation and
// pick candidate rowsets as input rowsets.
return transient_size;
} else if (*compaction_score >= max_compaction_score) {
// Score of `input_rowsets` exceed max compaction score, which means `input_rowsets` will never change and
// this tablet will never execute cumulative compaction. MUST execute compaction on these `input_rowsets`
// to reduce compaction score.
RowsetSharedPtr rs_with_max_score;
uint32_t max_score = 1;
for (auto& rs : *input_rowsets) {
if (rs->rowset_meta()->get_compaction_score() > max_score) {
max_score = rs->rowset_meta()->get_compaction_score();
rs_with_max_score = rs;
}
}
if (rs_with_max_score) {
input_rowsets->clear();
input_rowsets->push_back(std::move(rs_with_max_score));
*compaction_score = max_score;
return transient_size;
}
// no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
return transient_size;
}
}
input_rowsets->erase(input_rowsets->begin(), rs_begin);
*compaction_score = new_compaction_score;
VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = "
<< *compaction_score << ", total_size = " << total_size
<< ", calc promotion size value = " << promotion_size
<< ", tablet = " << tablet->tablet_id() << ", input_rowset size "
<< input_rowsets->size();
// empty return
if (input_rowsets->empty()) {
return transient_size;
}
// if we have a sufficient number of segments, we should process the compaction.
// otherwise, we check number of segments and total_size whether can do compaction.
if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) {
input_rowsets->clear();
*compaction_score = 0;
} else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) {
auto rs_meta = input_rowsets->front()->rowset_meta();
// if there is only one rowset and not overlapping,
// we do not need to do compaction
if (!rs_meta->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
}
return transient_size;
}
int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const {
int64_t promotion_size = int64_t(cast_set<double>(t->base_size()) * _promotion_ratio);
// promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size
return promotion_size > _promotion_size ? _promotion_size
: promotion_size < _promotion_min_size ? _promotion_min_size
: promotion_size;
}
int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version,
int64_t last_cumulative_point) {
TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), output_rowset.get(),
last_cumulative_point);
DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", {
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
auto cumu_point = dp->param<int64_t>("cumu_point", -1);
if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) {
LOG_INFO(
"[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] "
"tablet_id={}, cumu_point={}",
target_tablet_id, cumu_point);
return cumu_point;
}
});
// for MoW table, if there's too many versions, the delete bitmap will grow to
// a very big size, which may cause the tablet meta too big and the `save_meta`
// operation too slow.
// if the rowset should not promotion according to it's disk size, we should also
// consider it's version count here.
bool satisfy_promotion_version = tablet->enable_unique_key_merge_on_write() &&
output_rowset->end_version() - output_rowset->start_version() >
config::compaction_promotion_version_count;
// if rowsets have delete version, move to the last directly.
// if rowsets have no delete version, check output_rowset total disk size satisfies promotion size.
return (last_delete_version.first != -1 ||
output_rowset->total_disk_size() >= cloud_promotion_size(tablet) ||
satisfy_promotion_version)
? output_rowset->end_version() + 1
: last_cumulative_point;
}
int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
return TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
tablet, last_cumu, candidate_rowsets, max_compaction_score, min_compaction_score,
input_rowsets, last_delete_version, compaction_score, allow_delete);
}
int64_t CloudTimeSeriesCumulativeCompactionPolicy::get_compaction_level(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) {
return TimeSeriesCumulativeCompactionPolicy::get_compaction_level((BaseTablet*)tablet,
input_rowsets, output_rowset);
}
int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version,
int64_t last_cumulative_point) {
if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) {
return last_cumulative_point;
}
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
output_rowset->rowset_meta()->compaction_level() < 2) {
return last_cumulative_point;
}
return output_rowset->end_version() + 1;
}
#include "common/compile_check_end.h"
} // namespace doris