blob: 6f74f86bf9dd509ea3a7f721a5140b540ffac40a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/cumulative_compaction_policy.h"
#include <algorithm>
#include <list>
#include <ostream>
#include <string>
#include "common/config.h"
#include "common/logging.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "util/debug_points.h"
namespace doris {
#include "common/compile_check_begin.h"
SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
int64_t promotion_version_count, int64_t compaction_min_size)
: _promotion_size(promotion_size),
_promotion_ratio(promotion_ratio),
_promotion_min_size(promotion_min_size),
_promotion_version_count(promotion_version_count),
_compaction_min_size(compaction_min_size) {}
void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
Tablet* tablet, const RowsetMetaMapContainer& all_metas, int64_t current_cumulative_point,
int64_t* ret_cumulative_point) {
*ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
// only calculate the point once.
// after that, cumulative point will be updated along with compaction process.
return;
}
// empty return
if (all_metas.empty()) {
return;
}
std::list<RowsetMetaSharedPtr> existing_rss;
for (const auto& [_, rs] : all_metas) {
existing_rss.emplace_back(rs);
}
// sort the existing rowsets by version in ascending order
existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
// simple because 2 versions are certainly not overlapping
return a->version().first < b->version().first;
});
// calculate promotion size
auto base_rowset_meta = existing_rss.begin();
if (tablet->tablet_state() == TABLET_RUNNING) {
// check base rowset first version must be zero
// for tablet which state is not TABLET_RUNNING, there may not have base version.
CHECK((*base_rowset_meta)->start_version() == 0);
int64_t promotion_size = 0;
_calc_promotion_size(tablet, *base_rowset_meta, &promotion_size);
int64_t prev_version = -1;
for (const RowsetMetaSharedPtr& rs : existing_rss) {
if (rs->version().first > prev_version + 1) {
// There is a hole, do not continue
break;
}
bool is_delete = rs->has_delete_predicate();
// break the loop if segments in this rowset is overlapping.
if (!is_delete && rs->is_segments_overlapping()) {
*ret_cumulative_point = rs->version().first;
break;
}
// check the rowset is whether less than promotion size
if (!is_delete && rs->version().first != 0 && rs->total_disk_size() < promotion_size) {
*ret_cumulative_point = rs->version().first;
break;
}
// include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
prev_version = rs->version().second;
*ret_cumulative_point = prev_version + 1;
}
VLOG_NOTICE
<< "cumulative compaction size_based policy, calculate cumulative point value = "
<< *ret_cumulative_point << ", calc promotion size value = " << promotion_size
<< " tablet = " << tablet->tablet_id();
} else if (tablet->tablet_state() == TABLET_NOTREADY) {
// tablet under alter process
// we choose version next to the base version as cumulative point
for (const RowsetMetaSharedPtr& rs : existing_rss) {
if (rs->version().first > 0) {
*ret_cumulative_point = rs->version().first;
break;
}
}
}
}
void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(Tablet* tablet,
RowsetMetaSharedPtr base_rowset_meta,
int64_t* promotion_size) {
int64_t base_size = base_rowset_meta->total_disk_size();
*promotion_size = int64_t(cast_set<double>(base_size) * _promotion_ratio);
// promotion_size is between _promotion_size and _promotion_min_size
if (*promotion_size >= _promotion_size) {
*promotion_size = _promotion_size;
} else if (*promotion_size <= _promotion_min_size) {
*promotion_size = _promotion_min_size;
}
_refresh_tablet_promotion_size(tablet, *promotion_size);
}
void SizeBasedCumulativeCompactionPolicy::_refresh_tablet_promotion_size(Tablet* tablet,
int64_t promotion_size) {
tablet->set_cumulative_promotion_size(promotion_size);
}
void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset, Version& last_delete_version) {
if (tablet->tablet_state() != TABLET_RUNNING) {
// if tablet under alter process, do not update cumulative point
return;
}
// if rowsets have delete version, move to the last directly
if (last_delete_version.first != -1) {
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
} else {
// if rowsets have no delete version, check output_rowset total disk size
// satisfies promotion size.
size_t total_size = output_rowset->rowset_meta()->total_disk_size();
if (total_size >= tablet->cumulative_promotion_size()) {
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
} else if (tablet->enable_unique_key_merge_on_write() &&
output_rowset->end_version() - output_rowset->start_version() >
_promotion_version_count) {
// for MoW table, if there's too many versions, the delete bitmap will grow to
// a very big size, which may cause the tablet meta too big and the `save_meta`
// operation too slow.
// if the rowset should not promotion according to it's disk size, we should also
// consider it's version count here.
tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
}
}
}
uint32_t SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
uint32_t score = 0;
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();
int64_t promotion_size = 0;
std::vector<RowsetMetaSharedPtr> rowset_to_compact;
int64_t total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
// NOTE: tablet._meta_lock is hold
auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
// check the base rowset and collect the rowsets of cumulative part
for (const auto& [_, rs_meta] : rs_metas) {
if (rs_meta->start_version() < first_version) {
first_version = rs_meta->start_version();
first_meta = rs_meta;
}
// check base rowset
if (rs_meta->start_version() == 0) {
base_rowset_exist = true;
}
if (rs_meta->end_version() < point || !rs_meta->is_local()) {
// all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
continue;
} else {
// collect the rowsets of cumulative part
total_size += rs_meta->total_disk_size();
score += rs_meta->get_compaction_score();
rowset_to_compact.push_back(rs_meta);
}
}
if (first_meta == nullptr) {
return 0;
}
// Use "first"(not base) version to calc promotion size
// because some tablet do not have base version(under alter operation)
_calc_promotion_size(tablet, first_meta, &promotion_size);
// If base version does not exist, but its state is RUNNING.
// It is abnormal, do not select it and set *score = 0
if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
LOG(WARNING) << "tablet state is running but have no base version";
return 0;
}
// if total_size is greater than promotion_size, return total score
if (total_size >= promotion_size) {
return score;
}
// sort the rowsets of cumulative part
std::sort(rowset_to_compact.begin(), rowset_to_compact.end(), RowsetMeta::comparator);
// calculate the rowsets to do cumulative compaction
// eg: size of rowset_to_compact are:
// 128, 16, 16, 16
// we will choose [16,16,16] to compact.
for (auto& rs_meta : rowset_to_compact) {
int64_t current_level = _level_size(rs_meta->total_disk_size());
int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
// if current level less then remain level, score contains current rowset
// and process return; otherwise, score does not contains current rowset.
if (current_level <= remain_level) {
return score;
}
total_size -= rs_meta->total_disk_size();
score -= rs_meta->get_compaction_score();
}
return score;
}
int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", {
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
if (target_tablet_id == tablet->tablet_id()) {
auto start_version = dp->param<int64_t>("start_version", -1);
auto end_version = dp->param<int64_t>("end_version", -1);
for (auto& rowset : candidate_rowsets) {
if (rowset->start_version() >= start_version &&
rowset->end_version() <= end_version) {
input_rowsets->push_back(rowset);
}
}
}
return cast_set<uint32_t>(input_rowsets->size());
})
size_t promotion_size = tablet->cumulative_promotion_size();
auto max_version = tablet->max_version().first;
int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;
for (auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
*compaction_score = 0;
transient_size = 0;
continue;
}
}
if (tablet->tablet_state() == TABLET_NOTREADY) {
// If tablet under alter, keep latest 10 version so that base tablet max version
// not merged in new tablet, and then we can copy data from base tablet
if (rowset->version().second < max_version - 10) {
continue;
}
}
if (*compaction_score >= max_compaction_score) {
// got enough segments
break;
}
*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();
transient_size += 1;
input_rowsets->push_back(rowset);
}
DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets",
{ return transient_size; })
if (total_size >= promotion_size) {
return transient_size;
}
// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
if (input_rowsets->size() == 1) {
auto rs_meta = input_rowsets->front()->rowset_meta();
// if there is only one rowset and not overlapping,
// we do not need to do cumulative compaction
if (!rs_meta->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
}
return transient_size;
}
auto rs_begin = input_rowsets->begin();
size_t new_compaction_score = *compaction_score;
while (rs_begin != input_rowsets->end()) {
auto& rs_meta = (*rs_begin)->rowset_meta();
int64_t current_level = _level_size(rs_meta->total_disk_size());
int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
// if current level less then remain level, input rowsets contain current rowset
// and process return; otherwise, input rowsets do not contain current rowset.
if (current_level <= remain_level) {
break;
}
total_size -= rs_meta->total_disk_size();
new_compaction_score -= rs_meta->get_compaction_score();
++rs_begin;
}
if (rs_begin == input_rowsets->end() && *compaction_score >= max_compaction_score) {
// No suitable level size found in `input_rowsets` but score of `input_rowsets` exceed max compaction score,
// which means `input_rowsets` will never change and this tablet will never execute cumulative compaction.
// MUST execute compaction on these `input_rowsets` to reduce compaction score.
RowsetSharedPtr rs_with_max_score;
uint32_t max_score = 1;
for (auto& rs : *input_rowsets) {
if (rs->rowset_meta()->get_compaction_score() > max_score) {
max_score = rs->rowset_meta()->get_compaction_score();
rs_with_max_score = rs;
}
}
if (rs_with_max_score) {
input_rowsets->clear();
input_rowsets->push_back(std::move(rs_with_max_score));
*compaction_score = max_score;
return transient_size;
}
// no rowset is OVERLAPPING, execute compaction on all input rowsets
return transient_size;
}
input_rowsets->erase(input_rowsets->begin(), rs_begin);
*compaction_score = new_compaction_score;
VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = "
<< *compaction_score << ", total_size = " << total_size
<< ", calc promotion size value = " << promotion_size
<< ", tablet = " << tablet->tablet_id() << ", input_rowset size "
<< input_rowsets->size();
// empty return
if (input_rowsets->empty()) {
return transient_size;
}
// if we have a sufficient number of segments, we should process the compaction.
// otherwise, we check number of segments and total_size whether can do compaction.
if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) {
input_rowsets->clear();
*compaction_score = 0;
} else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) {
auto rs_meta = input_rowsets->front()->rowset_meta();
// if there is only one rowset and not overlapping,
// we do not need to do compaction
if (!rs_meta->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
}
return transient_size;
}
int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
if (size < 1024) return 0;
int64_t max_level = (int64_t)1
<< (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2));
if (size >= max_level) return max_level;
return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
}
std::shared_ptr<CumulativeCompactionPolicy>
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
const std::string_view& compaction_policy) {
if (compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
return std::make_shared<TimeSeriesCumulativeCompactionPolicy>();
} else if (compaction_policy == CUMULATIVE_SIZE_BASED_POLICY) {
return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
}
return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
}
#include "common/compile_check_end.h"
} // namespace doris