blob: e042c8074e017342d233a149c7d6ceab14854623 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_tablet.h"
#include <gen_cpp/olap_file.pb.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
#include <atomic>
#include <memory>
#include <shared_mutex>
#include <unordered_map>
#include <vector>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/storage_policy.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"
#include "vec/common/schema_util.h"
namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
static constexpr int LOAD_INITIATOR_ID = -1;
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
CloudTablet::~CloudTablet() = default;
bool CloudTablet::exceed_version_limit(int32_t limit) {
return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
}
Status CloudTablet::capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const {
Versions version_path;
auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
if (!st.ok()) {
// Check no missed versions or req version is merged
auto missed_versions = get_missed_versions(spec_version.second);
if (missed_versions.empty()) {
st.set_code(VERSION_ALREADY_MERGED); // Reset error code
}
st.append(" tablet_id=" + std::to_string(tablet_id()));
return st;
}
VLOG_DEBUG << "capture consitent versions: " << version_path;
return _capture_consistent_rowsets_unlocked(version_path, rowsets);
}
Status CloudTablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) {
DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
return Status::Error<false>(-230, "injected error");
});
Versions version_path;
std::shared_lock rlock(_meta_lock);
auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
if (!st.ok()) {
rlock.unlock(); // avoid logging in lock range
// Check no missed versions or req version is merged
auto missed_versions = get_missed_versions(spec_version.second);
if (missed_versions.empty()) {
st.set_code(VERSION_ALREADY_MERGED); // Reset error code
st.append(" versions are already compacted, ");
}
st.append(" tablet_id=" + std::to_string(tablet_id()));
// clang-format off
LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }();
// clang-format on
return st;
}
VLOG_DEBUG << "capture consitent versions: " << version_path;
return capture_rs_readers_unlocked(version_path, rs_splits);
}
Status CloudTablet::merge_rowsets_schema() {
// Find the rowset with the max version
auto max_version_rowset =
std::max_element(
_rs_version_map.begin(), _rs_version_map.end(),
[](const auto& a, const auto& b) {
return !a.second->tablet_schema()
? true
: (!b.second->tablet_schema()
? false
: a.second->tablet_schema()->schema_version() <
b.second->tablet_schema()
->schema_version());
})
->second;
TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
// If the schema has variant columns, perform a merge to create a wide tablet schema
if (max_version_schema->num_variant_columns() > 0) {
std::vector<TabletSchemaSPtr> schemas;
std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
[](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
// Merge the collected schemas to obtain the least common schema
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
max_version_schema));
VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
_merged_tablet_schema = max_version_schema;
}
return Status::OK();
}
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
RETURN_IF_ERROR(sync_if_not_running());
if (query_version > 0) {
std::shared_lock rlock(_meta_lock);
if (_max_version >= query_version) {
return Status::OK();
}
}
// serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);
if (query_version > 0) {
std::shared_lock rlock(_meta_lock);
if (_max_version >= query_version) {
return Status::OK();
}
}
auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}
// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
// It should be a quite rare situation.
Status CloudTablet::sync_if_not_running() {
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}
// Serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);
{
std::shared_lock rlock(_meta_lock);
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}
}
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}
if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
// MoW may go to here when load while schema change
return Status::OK();
}
TimestampedVersionTracker empty_tracker;
{
std::lock_guard wlock(_meta_lock);
RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
_rs_version_map.clear();
_stale_rs_version_map.clear();
std::swap(_timestamped_version_tracker, empty_tracker);
_tablet_meta->clear_rowsets();
_tablet_meta->clear_stale_rowset();
_max_version = -1;
}
st = _engine.meta_mgr().sync_tablet_rowsets(this);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}
TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rlock(_meta_lock);
return _merged_tablet_schema;
}
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
std::unique_lock<std::shared_mutex>& meta_lock,
bool warmup_delta_data) {
if (to_add.empty()) {
return;
}
auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
for (auto& rs : rowsets) {
if (version_overlap || warmup_delta_data) {
#ifndef BE_TEST
// Warmup rowset data in background
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
const auto& rowset_meta = rs->rowset_meta();
constexpr int64_t interval = 600; // 10 mins
// When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
// So we need to filter out the old rowsets avoid to download the whole table.
if (warmup_delta_data &&
::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
continue;
}
auto storage_resource = rowset_meta->remote_storage_resource();
if (!storage_resource) {
LOG(WARNING) << storage_resource.error();
continue;
}
int64_t expiration_time =
_tablet_meta->ttl_seconds() == 0 ||
rowset_meta->newest_write_timestamp() <= 0
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
_engine.file_cache_block_downloader().submit_download_task(
io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(
*rowset_meta, seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.download_done {},
});
auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.download_done {},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
auto schema_ptr = rowset_meta->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : schema_ptr->inverted_indexes()) {
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rowset_meta, seg_id, index->index_id(),
index->get_index_suffix());
download_idx_file(idx_path);
}
} else {
if (schema_ptr->has_inverted_index()) {
auto idx_path = storage_resource.value()->remote_idx_v2_path(
*rowset_meta, seg_id);
download_idx_file(idx_path);
}
}
}
#endif
}
_rs_version_map.emplace(rs->version(), rs);
_timestamped_version_tracker.add_version(rs->version());
_max_version = std::max(rs->end_version(), _max_version);
update_base_size(*rs);
}
_tablet_meta->add_rowsets_unchecked(rowsets);
};
if (!version_overlap) {
add_rowsets_directly(to_add);
return;
}
// Filter out existed rowsets
auto remove_it =
std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
if (auto find_it = _rs_version_map.find(rs->version());
find_it == _rs_version_map.end()) {
return false;
} else if (find_it->second->rowset_id() == rs->rowset_id()) {
return true; // Same rowset
}
// If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
// replace existed rowset with `to_add` rowset. This may occur when:
// 1. schema change converts rowsets which have been double written to new tablet
// 2. cumu compaction picks single overlapping input rowset to perform compaction
_tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
_rs_version_map[rs->version()] = rs;
_tablet_meta->add_rowsets_unchecked({rs});
update_base_size(*rs);
return true;
});
to_add.erase(remove_it, to_add.end());
// delete rowsets with overlapped version
std::vector<RowsetSharedPtr> to_add_directly;
for (auto& to_add_rs : to_add) {
// delete rowsets with overlapped version
std::vector<RowsetSharedPtr> to_delete;
Version to_add_v = to_add_rs->version();
// if start_version > max_version, we can skip checking overlap here.
if (to_add_v.first > _max_version) {
// if start_version > max_version, we can skip checking overlap here.
to_add_directly.push_back(to_add_rs);
} else {
to_add_directly.push_back(to_add_rs);
for (auto& [v, rs] : _rs_version_map) {
if (to_add_v.contains(v)) {
to_delete.push_back(rs);
}
}
delete_rowsets(to_delete, meta_lock);
}
}
add_rowsets_directly(to_add_directly);
}
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
std::unique_lock<std::shared_mutex>&) {
if (to_delete.empty()) {
return;
}
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_delete.size());
for (auto&& rs : to_delete) {
rs_metas.push_back(rs->rowset_meta());
_stale_rs_version_map[rs->version()] = rs;
}
_timestamped_version_tracker.add_stale_path_version(rs_metas);
for (auto&& rs : to_delete) {
_rs_version_map.erase(rs->version());
}
_tablet_meta->modify_rs_metas({}, rs_metas, false);
}
uint64_t CloudTablet::delete_expired_stale_rowsets() {
std::vector<RowsetSharedPtr> expired_rowsets;
// ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
std::vector<RowsetSharedPtr> stale_rowsets;
int64_t expired_stale_sweep_endtime =
::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
std::vector<std::string> version_to_delete;
{
std::unique_lock wlock(_meta_lock);
std::vector<int64_t> path_ids;
// capture the path version to delete
_timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
if (path_ids.empty()) {
return 0;
}
for (int64_t path_id : path_ids) {
int64_t start_version = -1;
int64_t end_version = -1;
// delete stale versions in version graph
auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
for (auto& v_ts : version_path->timestamped_versions()) {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
stale_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
_stale_rs_version_map.erase(rs_it);
} else {
LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
<< tablet_id();
// clang-format off
DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
// clang-format on
}
if (start_version < 0) {
start_version = v_ts->version().first;
}
end_version = v_ts->version().second;
_tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
}
Version version(start_version, end_version);
version_to_delete.emplace_back(version.to_string());
}
_reconstruct_version_tracker_if_necessary();
}
_tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);
recycle_cached_data(expired_rowsets);
return expired_rowsets.size();
}
void CloudTablet::update_base_size(const Rowset& rs) {
// Define base rowset as the rowset of version [2-x]
if (rs.start_version() == 2) {
_base_size = rs.total_disk_size();
}
}
void CloudTablet::clear_cache() {
CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
_engine.tablet_mgr().erase_tablet(tablet_id());
}
void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) {
for (auto& rs : rowsets) {
// Clear cached opened segments and inverted index cache in memory
rs->clear_cache();
}
if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
continue;
}
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
// TODO: Segment::file_cache_key
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->remove_if_cached_async(file_key);
}
}
}
}
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
int64_t num_rows, int64_t data_size) {
_approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
_approximate_num_segments.store(num_segments, std::memory_order_relaxed);
_approximate_num_rows.store(num_rows, std::memory_order_relaxed);
_approximate_data_size.store(data_size, std::memory_order_relaxed);
int64_t cumu_num_deltas = 0;
int64_t cumu_num_rowsets = 0;
auto cp = _cumulative_point.load(std::memory_order_relaxed);
for (auto& [v, r] : _rs_version_map) {
if (v.second < cp) {
continue;
}
cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
++cumu_num_rowsets;
}
_approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
_approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
}
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
RowsetWriterContext& context, bool vertical) {
context.rowset_id = _engine.next_rowset_id();
// FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
context.tablet_id = tablet_id();
context.index_id = index_id();
context.partition_id = partition_id();
context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
return RowsetFactory::create_rowset_writer(_engine, context, vertical);
}
// create a rowset writer with rowset_id and seg_id
// after writer, merge this transient rowset with original rowset
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
int64_t txn_expiration) {
if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
auto msg = fmt::format(
"wrong rowset state when create_transient_rowset_writer, rowset state should be "
"BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
tablet_id());
// see `CloudRowsetWriter::build` for detail.
// if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
// in `RowsetMeta::merge_rowset_meta()` in previous trials.
LOG(WARNING) << msg;
DCHECK(false) << msg;
}
RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
// During a partial update, the extracted columns of a variant should not be included in the tablet schema.
// This is because the partial update for a variant needs to ignore the extracted columns.
// Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
// the complete variant is constructed by reading all the sub-columns of the variant.
context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
context.write_type = DataWriteType::TYPE_DIRECT;
context.partial_update_info = std::move(partial_update_info);
context.is_transient_rowset_writer = true;
context.rowset_id = rowset.rowset_id();
context.tablet_id = tablet_id();
context.index_id = index_id();
context.partition_id = partition_id();
context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
context.txn_expiration = txn_expiration;
auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
if (!storage_resource) {
return ResultError(std::move(storage_resource.error()));
}
context.storage_resource = *storage_resource.value();
return RowsetFactory::create_rowset_writer(_engine, context, false)
.transform([&](auto&& writer) {
writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments()));
return writer;
});
}
int64_t CloudTablet::get_cloud_base_compaction_score() const {
if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
bool has_delete = false;
int64_t point = cumulative_layer_point();
std::shared_lock<std::shared_mutex> rlock(_meta_lock);
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
if (rs_meta->start_version() >= point) {
continue;
}
if (rs_meta->has_delete_predicate()) {
has_delete = true;
break;
}
}
if (!has_delete) {
return 0;
}
}
return _approximate_num_rowsets.load(std::memory_order_relaxed) -
_approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
}
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
// TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
// number of tablet versions simultaneously.
return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
}
// return a json string to show the compaction status of this tablet
void CloudTablet::get_compaction_status(std::string* json_result) {
rapidjson::Document root;
root.SetObject();
rapidjson::Document path_arr;
path_arr.SetArray();
std::vector<RowsetSharedPtr> rowsets;
std::vector<RowsetSharedPtr> stale_rowsets;
{
std::shared_lock rdlock(_meta_lock);
rowsets.reserve(_rs_version_map.size());
for (auto& it : _rs_version_map) {
rowsets.push_back(it.second);
}
stale_rowsets.reserve(_stale_rs_version_map.size());
for (auto& it : _stale_rs_version_map) {
stale_rowsets.push_back(it.second);
}
}
std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
// get snapshot version path json_doc
_timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
// print all rowsets' version as an array
rapidjson::Document versions_arr;
rapidjson::Document missing_versions_arr;
versions_arr.SetArray();
missing_versions_arr.SetArray();
int64_t last_version = -1;
for (auto& rowset : rowsets) {
const Version& ver = rowset->version();
if (ver.first != last_version + 1) {
rapidjson::Value miss_value;
miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
missing_versions_arr.GetAllocator());
missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
}
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
versions_arr.GetAllocator());
versions_arr.PushBack(value, versions_arr.GetAllocator());
last_version = ver.second;
}
root.AddMember("rowsets", versions_arr, root.GetAllocator());
root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
// print all stale rowsets' version as an array
rapidjson::Document stale_versions_arr;
stale_versions_arr.SetArray();
for (auto& rowset : stale_rowsets) {
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
stale_versions_arr.GetAllocator());
stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
}
root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
// add stale version rowsets
root.AddMember("stale version path", path_arr, root.GetAllocator());
// to json string
rapidjson::StringBuffer strbuf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
*json_result = std::string(strbuf.GetString());
}
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
_cumulative_point = new_point;
return;
}
// cumulative point should only be reset to -1, or be increased
// FIXME: could happen in currently unresolved race conditions
LOG(WARNING) << "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
}
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
std::vector<RowsetSharedPtr> candidate_rowsets;
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
if (version.first != 0 && version.first < _cumulative_point &&
(_alter_version == -1 || version.second <= _alter_version)) {
candidate_rowsets.push_back(rs);
}
}
}
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
return candidate_rowsets;
}
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() {
std::vector<RowsetSharedPtr> candidate_rowsets;
{
std::shared_lock rlock(_meta_lock);
for (auto& [v, rs] : _rs_version_map) {
// MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
if (v.first != 0) {
candidate_rowsets.push_back(rs);
}
}
}
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
return candidate_rowsets;
}
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
return _engine.calc_delete_bitmap_executor();
}
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
rowset_writer->num_rows() > 0) {
DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
return Status::InternalError<false>("injected update_tmp_rowset error.");
});
const auto& rowset_meta = rowset->rowset_meta();
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}
RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id));
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));
DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
auto sleep_sec = dp->param<int>("sleep", 5);
std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
});
DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
auto retry = dp->param<bool>("retry", false);
if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"injected DELETE_BITMAP_LOCK_ERROR");
} else {
return Status::InternalError<false>("injected non-retryable error");
}
});
return Status::OK();
}
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id) {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
// skip sentinel mark, which is used for delete bitmap correctness check
if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
new_delete_bitmap->merge(
{std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
iter->second);
}
}
auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));
return Status::OK();
}
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
// sort the existing versions in ascending order
std::sort(existing_versions.begin(), existing_versions.end(),
[](const Version& a, const Version& b) {
// simple because 2 versions are certainly not overlapping
return a.first < b.first;
});
// From the first version(=0), find the missing version until spec_version
int64_t last_version = -1;
Versions missed_versions;
for (const Version& version : existing_versions) {
if (version.first > last_version + 1) {
// there is a hole between versions
missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
}
last_version = version.second;
if (last_version >= spec_version) {
break;
}
}
if (last_version < spec_version) {
// there is a hole between the last version and the specificed version.
missed_versions.emplace_back(last_version + 1, spec_version);
}
return missed_versions;
}
Status CloudTablet::calc_delete_bitmap_for_compaction(
const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
bool allow_delete_in_cumu_compaction) {
output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
std::unique_ptr<RowLocationSet> missed_rows;
if ((config::enable_missing_rows_correctness_check ||
config::enable_mow_compaction_correctness_check_core) &&
!allow_delete_in_cumu_compaction &&
compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
missed_rows = std::make_unique<RowLocationSet>();
LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
}
std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
if (config::enable_rowid_conversion_correctness_check &&
tablet_schema()->cluster_key_uids().empty()) {
location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
}
// 1. calc delete bitmap for historical data
RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
Version version = max_version();
std::size_t missed_rows_size = 0;
calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
if (missed_rows) {
missed_rows_size = missed_rows->size();
if (!allow_delete_in_cumu_compaction) {
if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
tablet_state() == TABLET_RUNNING) {
if (merged_rows + filtered_rows >= 0 &&
merged_rows + filtered_rows != missed_rows_size) {
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}), the filtered rows({}) is "
"not equal to missed rows({}) in rowid conversion, tablet_id: {}, "
"table_id:{}",
merged_rows, filtered_rows, missed_rows_size, tablet_id(), table_id());
if (config::enable_mow_compaction_correctness_check_core) {
CHECK(false) << err_msg;
} else {
DCHECK(false) << err_msg;
}
LOG(WARNING) << err_msg;
}
}
}
}
if (location_map) {
RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
location_map->clear();
}
// 2. calc delete bitmap for incremental data
int64_t t1 = MonotonicMicros();
RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
*this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
int64_t t2 = MonotonicMicros();
RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
int64_t t3 = MonotonicMicros();
calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
int64_t t4 = MonotonicMicros();
if (location_map) {
RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
}
int64_t t5 = MonotonicMicros();
if (missed_rows) {
DCHECK_EQ(missed_rows->size(), missed_rows_size);
if (missed_rows->size() != missed_rows_size) {
LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size
<< " after: " << missed_rows->size();
}
}
// 3. store delete bitmap
auto st = _engine.meta_mgr().update_delete_bitmap(*this, -1, initiator,
output_rowset_delete_bitmap.get());
int64_t t6 = MonotonicMicros();
LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
<< ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2)
<< " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost "
<< (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5)
<< " us, st=" << st.to_string();
return st;
}
Status CloudTablet::sync_meta() {
if (!config::enable_file_cache) {
return Status::OK();
}
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}
auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
_tablet_meta->set_ttl_seconds(new_ttl_seconds);
int64_t cur_time = UnixSeconds();
std::shared_lock rlock(_meta_lock);
for (auto& [_, rs] : _rs_version_map) {
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
int64_t new_expiration_time =
new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp();
new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0;
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->modify_expiration_time(file_key, new_expiration_time);
}
}
}
auto new_compaction_policy = tablet_meta->compaction_policy();
if (_tablet_meta->compaction_policy() != new_compaction_policy) {
_tablet_meta->set_compaction_policy(new_compaction_policy);
}
auto new_time_series_compaction_goal_size_mbytes =
tablet_meta->time_series_compaction_goal_size_mbytes();
if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
new_time_series_compaction_goal_size_mbytes) {
_tablet_meta->set_time_series_compaction_goal_size_mbytes(
new_time_series_compaction_goal_size_mbytes);
}
auto new_time_series_compaction_file_count_threshold =
tablet_meta->time_series_compaction_file_count_threshold();
if (_tablet_meta->time_series_compaction_file_count_threshold() !=
new_time_series_compaction_file_count_threshold) {
_tablet_meta->set_time_series_compaction_file_count_threshold(
new_time_series_compaction_file_count_threshold);
}
auto new_time_series_compaction_time_threshold_seconds =
tablet_meta->time_series_compaction_time_threshold_seconds();
if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
new_time_series_compaction_time_threshold_seconds) {
_tablet_meta->set_time_series_compaction_time_threshold_seconds(
new_time_series_compaction_time_threshold_seconds);
}
auto new_time_series_compaction_empty_rowsets_threshold =
tablet_meta->time_series_compaction_empty_rowsets_threshold();
if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
new_time_series_compaction_empty_rowsets_threshold) {
_tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
new_time_series_compaction_empty_rowsets_threshold);
}
auto new_time_series_compaction_level_threshold =
tablet_meta->time_series_compaction_level_threshold();
if (_tablet_meta->time_series_compaction_level_threshold() !=
new_time_series_compaction_level_threshold) {
_tablet_meta->set_time_series_compaction_level_threshold(
new_time_series_compaction_level_threshold);
}
return Status::OK();
}
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
std::shared_lock rdlock(_meta_lock);
tablet_info->__set_total_version_count(_tablet_meta->version_count());
tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
// Currently, this information will not be used by the cloud report,
// but it may be used in the future.
}
Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
DeleteBitmap* expected_delete_bitmap) {
DeleteBitmapPtr cached_delete_bitmap;
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap(
txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr);
if (st.ok()) {
bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality());
auto msg = fmt::format(
"delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}"
"txn_id={}, tablet_id={}",
expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id,
tablet_id());
if (!res) {
DCHECK(res) << msg;
return Status::InternalError<false>(msg);
}
}
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris