blob: 04a1c33d5c385f4fef4fb524ee92dffd1513de6e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_tablet_mgr.h"
#include <bthread/countdown_event.h>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "common/status.h"
#include "olap/lru_cache.h"
#include "runtime/memory/cache_policy.h"
namespace doris {
uint64_t g_tablet_report_inactive_duration_ms = 0;
namespace {
// port from
// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go
template <typename Key, typename Val>
class SingleFlight {
public:
SingleFlight() = default;
SingleFlight(const SingleFlight&) = delete;
void operator=(const SingleFlight&) = delete;
using Loader = std::function<Val(const Key&)>;
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
Val load(const Key& key, Loader loader) {
std::unique_lock lock(_call_map_mtx);
auto it = _call_map.find(key);
if (it != _call_map.end()) {
auto call = it->second;
lock.unlock();
if (int ec = call->event.wait(); ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()),
"CountdownEvent wait failed");
}
return call->val;
}
auto call = std::make_shared<Call>();
_call_map.emplace(key, call);
lock.unlock();
call->val = loader(key);
call->event.signal();
lock.lock();
_call_map.erase(key);
lock.unlock();
return call->val;
}
private:
// `Call` is an in-flight or completed `load` call
struct Call {
bthread::CountdownEvent event;
Val val;
};
std::mutex _call_map_mtx;
std::unordered_map<Key, std::shared_ptr<Call>> _call_map;
};
SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>> s_singleflight_load_tablet;
} // namespace
// tablet_id -> cached tablet
// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle.
// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order.
// TODO(plat1ko): multi shard to increase concurrency
class CloudTabletMgr::TabletMap {
public:
void put(std::shared_ptr<CloudTablet> tablet) {
std::lock_guard lock(_mtx);
_map[tablet->tablet_id()] = std::move(tablet);
}
void erase(CloudTablet* tablet) {
std::lock_guard lock(_mtx);
auto it = _map.find(tablet->tablet_id());
// According to the implementation of `LRUCache`, `deleter` may be called after a tablet
// with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet
// instance to be erased is the same one in the map.
if (it != _map.end() && it->second.get() == tablet) {
_map.erase(it);
}
}
std::shared_ptr<CloudTablet> get(int64_t tablet_id) {
std::lock_guard lock(_mtx);
if (auto it = _map.find(tablet_id); it != _map.end()) {
return it->second;
}
return nullptr;
}
size_t size() { return _map.size(); }
void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) {
std::lock_guard lock(_mtx);
for (auto& [_, tablet] : _map) {
visitor(tablet);
}
}
private:
std::mutex _mtx;
std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map;
};
// TODO(plat1ko): Prune cache
CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)
: _engine(engine),
_tablet_map(std::make_unique<TabletMap>()),
_cache(std::make_unique<LRUCachePolicy>(
CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity,
LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {}
CloudTabletMgr::~CloudTabletMgr() = default;
void set_tablet_access_time_ms(CloudTablet* tablet) {
using namespace std::chrono;
int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tablet->last_access_time_ms = now;
}
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data,
bool sync_delete_bitmap) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
class Value : public LRUCacheValueBase {
public:
Value(const std::shared_ptr<CloudTablet>& tablet, TabletMap& tablet_map)
: tablet(tablet), tablet_map(tablet_map) {}
~Value() override { tablet_map.erase(tablet.get()); }
// FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value`
// only requires a reference.
std::shared_ptr<CloudTablet> tablet;
TabletMap& tablet_map;
};
auto tablet_id_str = std::to_string(tablet_id);
CacheKey key(tablet_id_str);
auto* handle = _cache->lookup(key);
if (handle == nullptr) {
auto load_tablet = [this, &key, warmup_data,
sync_delete_bitmap](int64_t tablet_id) -> std::shared_ptr<CloudTablet> {
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
if (!st.ok()) {
LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st;
return nullptr;
}
auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
auto value = std::make_unique<Value>(tablet, *_tablet_map);
// MUST sync stats to let compaction scheduler work correctly
st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), warmup_data,
sync_delete_bitmap);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st;
return nullptr;
}
auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet),
CachePriority::NORMAL);
auto ret =
std::shared_ptr<CloudTablet>(tablet.get(), [this, handle](CloudTablet* tablet) {
set_tablet_access_time_ms(tablet);
_cache->release(handle);
});
_tablet_map->put(std::move(tablet));
return ret;
};
auto tablet = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet));
if (tablet == nullptr) {
return ResultError(Status::InternalError("failed to get tablet {}", tablet_id));
}
set_tablet_access_time_ms(tablet.get());
return tablet;
}
CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
set_tablet_access_time_ms(tablet_raw_ptr);
auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this, handle](CloudTablet* tablet) {
set_tablet_access_time_ms(tablet);
_cache->release(handle);
});
return tablet;
}
void CloudTabletMgr::erase_tablet(int64_t tablet_id) {
auto tablet_id_str = std::to_string(tablet_id);
CacheKey key(tablet_id_str.data(), tablet_id_str.size());
_cache->erase(key);
}
void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) {
LOG_INFO("begin to vacuum stale rowsets");
std::vector<std::shared_ptr<CloudTablet>> tablets_to_vacuum;
tablets_to_vacuum.reserve(_tablet_map->size());
_tablet_map->traverse([&tablets_to_vacuum](auto&& t) {
if (t->has_stale_rowsets()) {
tablets_to_vacuum.push_back(t);
}
});
int num_vacuumed = 0;
for (auto& t : tablets_to_vacuum) {
if (stop_latch.count() <= 0) {
break;
}
num_vacuumed += t->delete_expired_stale_rowsets();
}
LOG_INFO("finish vacuum stale rowsets")
.tag("num_vacuumed", num_vacuumed)
.tag("num_tablets", tablets_to_vacuum.size());
}
std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
std::vector<std::weak_ptr<CloudTablet>> weak_tablets;
weak_tablets.reserve(_tablet_map->size());
_tablet_map->traverse([&weak_tablets](auto& t) { weak_tablets.push_back(t); });
return weak_tablets;
}
void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) {
LOG_INFO("begin to sync tablets");
int64_t last_sync_time_bound = ::time(nullptr) - config::tablet_sync_interval_s;
auto weak_tablets = get_weak_tablets();
// sort by last_sync_time
static auto cmp = [](const auto& a, const auto& b) { return a.first < b.first; };
std::multiset<std::pair<int64_t, std::weak_ptr<CloudTablet>>, decltype(cmp)>
sync_time_tablet_set(cmp);
for (auto& weak_tablet : weak_tablets) {
if (auto tablet = weak_tablet.lock()) {
int64_t last_sync_time = tablet->last_sync_time_s;
if (last_sync_time <= last_sync_time_bound) {
sync_time_tablet_set.emplace(last_sync_time, weak_tablet);
}
}
}
int num_sync = 0;
for (auto&& [_, weak_tablet] : sync_time_tablet_set) {
if (stop_latch.count() <= 0) {
break;
}
if (auto tablet = weak_tablet.lock()) {
if (tablet->last_sync_time_s > last_sync_time_bound) {
continue;
}
++num_sync;
auto st = tablet->sync_meta();
if (!st) {
LOG_WARNING("failed to sync tablet meta {}", tablet->tablet_id()).error(st);
if (st.is<ErrorCode::NOT_FOUND>()) {
continue;
}
}
st = tablet->sync_rowsets(-1);
if (!st) {
LOG_WARNING("failed to sync tablet rowsets {}", tablet->tablet_id()).error(st);
}
}
}
LOG_INFO("finish sync tablets").tag("num_sync", num_sync);
}
Status CloudTabletMgr::get_topn_tablets_to_compact(
int n, CompactionType compaction_type, const std::function<bool(CloudTablet*)>& filter_out,
std::vector<std::shared_ptr<CloudTablet>>* tablets, int64_t* max_score) {
DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::CUMULATIVE_COMPACTION);
*max_score = 0;
int64_t max_score_tablet_id = 0;
// clang-format off
auto score = [compaction_type](CloudTablet* t) {
return compaction_type == CompactionType::BASE_COMPACTION ? t->get_cloud_base_compaction_score()
: compaction_type == CompactionType::CUMULATIVE_COMPACTION ? t->get_cloud_cumu_compaction_score()
: 0;
};
using namespace std::chrono;
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
auto skip = [now, compaction_type](CloudTablet* t) {
if (compaction_type == CompactionType::BASE_COMPACTION) {
return now - t->last_base_compaction_success_time_ms < config::base_compaction_freeze_interval_s * 1000;
}
// If tablet has too many rowsets but not be compacted for a long time, compaction should be performed
// regardless of whether there is a load job recently.
return now - t->last_cumu_no_suitable_version_ms < config::min_compaction_failure_interval_ms ||
(now - t->last_load_time_ms > config::cu_compaction_freeze_interval_s * 1000
&& now - t->last_cumu_compaction_success_time_ms < config::cumu_compaction_interval_s * 1000
&& t->fetch_add_approximate_num_rowsets(0) < config::max_tablet_version_num / 2);
};
// We don't schedule tablets that are disabled for compaction
auto disable = [](CloudTablet* t) { return t->tablet_meta()->tablet_schema()->disable_auto_compaction(); };
auto [num_filtered, num_disabled, num_skipped] = std::make_tuple(0, 0, 0);
auto weak_tablets = get_weak_tablets();
std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf;
buf.reserve(n + 1);
for (auto& weak_tablet : weak_tablets) {
auto t = weak_tablet.lock();
if (t == nullptr) { continue; }
int64_t s = score(t.get());
if (s <= 0) { continue; }
if (s > *max_score) {
max_score_tablet_id = t->tablet_id();
*max_score = s;
}
if (filter_out(t.get())) { ++num_filtered; continue; }
if (disable(t.get())) { ++num_disabled; continue; }
if (skip(t.get())) { ++num_skipped; continue; }
buf.emplace_back(std::move(t), s);
std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; });
if (buf.size() > n) { buf.pop_back(); }
}
LOG_EVERY_N(INFO, 1000) << "get_topn_compaction_score, n=" << n << " type=" << compaction_type
<< " num_tablets=" << weak_tablets.size() << " num_skipped=" << num_skipped
<< " num_disabled=" << num_disabled << " num_filtered=" << num_filtered
<< " max_score=" << *max_score << " max_score_tablet=" << max_score_tablet_id
<< " tablets=[" << [&buf] { std::stringstream ss; for (auto& i : buf) ss << i.first->tablet_id() << ":" << i.second << ","; return ss.str(); }() << "]"
;
// clang-format on
tablets->clear();
tablets->reserve(n + 1);
for (auto& [t, _] : buf) {
tablets->emplace_back(std::move(t));
}
return Status::OK();
}
void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info,
uint64_t* tablet_num) {
DCHECK(tablets_info != nullptr);
VLOG_NOTICE << "begin to build all report cloud tablets info";
HistogramStat tablet_version_num_hist;
auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) {
auto tablet = tablet_wk.lock();
if (!tablet) return;
(*tablet_num)++;
TTabletInfo tablet_info;
tablet->build_tablet_report_info(&tablet_info);
using namespace std::chrono;
int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) {
// the tablet is still being accessed and used in recently, so not report it
return;
}
auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
// On the cloud, a specific BE has only one tablet replica;
// there are no multiple replicas for a specific BE.
// This is only to reuse the non-cloud report protocol.
tablet_version_num_hist.add(tablet_info.total_version_count);
t_tablet.tablet_infos.emplace_back(std::move(tablet_info));
};
auto weak_tablets = get_weak_tablets();
std::for_each(weak_tablets.begin(), weak_tablets.end(), handler);
DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
tablet_version_num_hist);
LOG(INFO) << "success to build all cloud report tablets info. all_tablet_count=" << *tablet_num
<< " exceed drop time limit count=" << tablets_info->size();
}
void CloudTabletMgr::get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info) {
auto weak_tablets = get_weak_tablets();
for (auto& weak_tablet : weak_tablets) {
auto tablet = weak_tablet.lock();
if (tablet == nullptr) {
continue;
}
if (tablets_info->size() >= num_tablets) {
return;
}
tablets_info->push_back(tablet->get_tablet_info());
}
}
void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
uint64_t* max_delete_bitmap_score, uint64_t* max_base_rowset_delete_bitmap_score) {
int64_t max_delete_bitmap_score_tablet_id = 0;
OlapStopWatch watch;
uint64_t total_delete_map_count = 0;
int64_t max_base_rowset_delete_bitmap_score_tablet_id = 0;
int n = config::check_tablet_delete_bitmap_score_top_n;
std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf;
buf.reserve(n + 1);
auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) {
auto t = tablet_wk.lock();
if (!t) return;
uint64_t delete_bitmap_count =
t.get()->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
total_delete_map_count += delete_bitmap_count;
if (delete_bitmap_count > *max_delete_bitmap_score) {
max_delete_bitmap_score_tablet_id = t->tablet_id();
*max_delete_bitmap_score = delete_bitmap_count;
}
buf.emplace_back(std::move(t), delete_bitmap_count);
std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; });
if (buf.size() > n) {
buf.pop_back();
}
};
auto weak_tablets = get_weak_tablets();
std::for_each(weak_tablets.begin(), weak_tablets.end(), handler);
for (auto& [t, _] : buf) {
t->get_base_rowset_delete_bitmap_count(max_base_rowset_delete_bitmap_score,
&max_base_rowset_delete_bitmap_score_tablet_id);
}
std::stringstream ss;
for (auto& i : buf) {
ss << i.first->tablet_id() << ":" << i.second << ",";
}
LOG(INFO) << "get_topn_tablet_delete_bitmap_score, n=" << n
<< ",tablet size=" << weak_tablets.size()
<< ",total_delete_map_count=" << total_delete_map_count
<< ",cost(us)=" << watch.get_elapse_time_us()
<< ",max_delete_bitmap_score=" << *max_delete_bitmap_score
<< ",max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id
<< ",max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score
<< ",max_base_rowset_delete_bitmap_score_tablet_id="
<< max_base_rowset_delete_bitmap_score_tablet_id << ",tablets=[" << ss.str() << "]";
}
} // namespace doris