blob: ddcef3fbd8ef7c42c094cf5613e990414ea57ebd [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_tablet_hotspot.h"
#include <algorithm>
#include <chrono>
#include <cmath>
#include <cstdint>
#include <mutex>
#include <vector>
#include "runtime/memory/global_memory_arbitrator.h"
#include "storage/tablet/base_tablet.h"
namespace doris {
namespace {
using HotPartitionSnapshot =
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>;
constexpr uint64_t kCountHeartbeatInterval = 500000;
constexpr uint64_t kNewCounterLogInterval = 4096;
constexpr size_t kCompactBucketMultiplier = 4;
constexpr size_t kCompactEraseRatioDivisor = 4;
using SystemTimePoint = std::chrono::system_clock::time_point;
using SteadyClock = std::chrono::steady_clock;
double bytes_to_mb(int64_t bytes) {
return static_cast<double>(bytes) / (1024.0 * 1024.0);
}
double ratio_or_zero(uint64_t numerator, uint64_t denominator) {
if (denominator == 0) {
return 0.0;
}
return static_cast<double>(numerator) / static_cast<double>(denominator);
}
int64_t process_memory_usage_for_diag() {
#ifdef BE_TEST
return 0;
#else
return GlobalMemoryArbitrator::process_memory_usage();
#endif
}
uint64_t count_hot_partition_entries(const HotPartitionSnapshot& snapshot) {
uint64_t entries = 0;
for (const auto& [_, partition_to_value] : snapshot) {
entries += partition_to_value.size();
}
return entries;
}
bool should_compact_slot(size_t slot_size_before, size_t slot_size_after, size_t bucket_count_after,
size_t erased_count) {
if (erased_count == 0) {
return false;
}
if (slot_size_after == 0) {
return true;
}
return bucket_count_after > (kCompactBucketMultiplier * slot_size_after) ||
(slot_size_before > 0 && erased_count * kCompactEraseRatioDivisor >= slot_size_before);
}
} // namespace
void TabletHotspot::count(const BaseTablet& tablet) {
count(tablet.tablet_id(), tablet.table_id(), tablet.index_id(), tablet.partition_id());
}
void TabletHotspot::count(int64_t tablet_id, int64_t table_id, int64_t index_id,
int64_t partition_id) {
const uint64_t count_calls_total =
_count_calls_total.fetch_add(1, std::memory_order_relaxed) + 1;
size_t slot_idx = tablet_id % s_slot_size;
auto& slot = _tablets_hotspot[slot_idx];
bool should_log_new_counter = false;
uint64_t new_counter_total = 0;
{
std::lock_guard lock(slot.mtx);
HotspotCounterPtr counter;
if (auto iter = slot.map.find(tablet_id); iter == slot.map.end()) {
counter = std::make_shared<HotspotCounter>(table_id, index_id, partition_id);
slot.map.insert(std::make_pair(tablet_id, counter));
new_counter_total = _new_counter_total.fetch_add(1, std::memory_order_relaxed) + 1;
should_log_new_counter = (new_counter_total % kNewCounterLogInterval == 0);
} else {
counter = iter->second;
_existing_hit_total.fetch_add(1, std::memory_order_relaxed);
}
counter->last_access_time = std::chrono::system_clock::now();
counter->cur_counter.fetch_add(1, std::memory_order_relaxed);
}
if (should_log_new_counter) {
LOG(INFO) << "tablet_hotspot_diag new_counter_total=" << new_counter_total
<< " count_calls_total=" << count_calls_total
<< " existing_hit_total=" << _existing_hit_total.load(std::memory_order_relaxed);
}
if (count_calls_total % kCountHeartbeatInterval == 0) {
LOG(INFO) << "tablet_hotspot_diag count_heartbeat count_calls_total=" << count_calls_total
<< " existing_hit_total=" << _existing_hit_total.load(std::memory_order_relaxed)
<< " new_counter_total=" << _new_counter_total.load(std::memory_order_relaxed);
}
}
TabletHotspot::TabletHotspot(bool start_counter_thread) {
if (start_counter_thread) {
_counter_thread = std::thread(&TabletHotspot::make_dot_point, this);
}
}
TabletHotspot::~TabletHotspot() {
{
std::lock_guard lock(_mtx);
_closed = true;
}
_cond.notify_all();
if (_counter_thread.joinable()) {
_counter_thread.join();
}
}
void get_return_partitions(
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
hot_partition,
const std::unordered_map<TabletHotspotMapKey,
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
last_hot_partition,
std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
for (const auto& [key, partition_to_value] : hot_partition) {
THotTableMessage msg;
msg.table_id = key.first;
msg.index_id = key.second;
for (const auto& [partition_id, value] : partition_to_value) {
if (return_partitions > N) {
return;
}
auto last_value_iter = last_hot_partition.find(key);
if (last_value_iter != last_hot_partition.end()) {
auto last_partition_iter = last_value_iter->second.find(partition_id);
if (last_partition_iter != last_value_iter->second.end()) {
const auto& last_value = last_partition_iter->second;
if (std::abs(static_cast<int64_t>(value.qpd) -
static_cast<int64_t>(last_value.qpd)) < 5 &&
std::abs(static_cast<int64_t>(value.qpw) -
static_cast<int64_t>(last_value.qpw)) < 10 &&
std::abs(static_cast<int64_t>(value.last_access_time) -
static_cast<int64_t>(last_value.last_access_time)) < 60) {
LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
<< " qpw=" << value.qpw
<< " last_access_time=" << value.last_access_time
<< " last_qpd=" << last_value.qpd
<< " last_qpw=" << last_value.qpw
<< " last_access_time=" << last_value.last_access_time;
continue;
}
}
}
THotPartition new_hot_partition;
new_hot_partition.__set_partition_id(partition_id);
new_hot_partition.__set_query_per_day(value.qpd);
new_hot_partition.__set_query_per_week(value.qpw);
new_hot_partition.__set_last_access_time(value.last_access_time);
msg.hot_partitions.push_back(new_hot_partition);
return_partitions++;
}
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
hot_tables->push_back(std::move(msg));
}
}
void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
const uint64_t call_id =
_get_top_n_hot_partition_call_total.fetch_add(1, std::memory_order_relaxed) + 1;
uint64_t last_day_tables_before = 0;
uint64_t last_day_entries_before = 0;
uint64_t last_week_tables_before = 0;
uint64_t last_week_entries_before = 0;
{
std::lock_guard lock(_last_partitions_mtx);
last_day_tables_before = _last_day_hot_partitions.size();
last_day_entries_before = count_hot_partition_entries(_last_day_hot_partitions);
last_week_tables_before = _last_week_hot_partitions.size();
last_week_entries_before = count_hot_partition_entries(_last_week_hot_partitions);
}
const int64_t process_mem_before = process_memory_usage_for_diag();
// map<pair<table_id, index_id>, map<partition_id, value>> for day
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
day_hot_partitions;
// map<pair<table_id, index_id>, map<partition_id, value>> for week
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
MapKeyHash>
week_hot_partitions;
std::for_each(_tablets_hotspot.begin(), _tablets_hotspot.end(), [&](HotspotMap& map) {
std::lock_guard lock(map.mtx);
for (auto& [_, counter] : map.map) {
if (counter->qpd() != 0) {
auto& hot_partition = day_hot_partitions[std::make_pair(
counter->table_id, counter->index_id)][counter->partition_id];
hot_partition.qpd = std::max(hot_partition.qpd, counter->qpd());
hot_partition.qpw = std::max(hot_partition.qpw, counter->qpw());
hot_partition.last_access_time =
std::max<int64_t>(hot_partition.last_access_time,
std::chrono::duration_cast<std::chrono::seconds>(
counter->last_access_time.time_since_epoch())
.count());
} else if (counter->qpw() != 0) {
auto& hot_partition = week_hot_partitions[std::make_pair(
counter->table_id, counter->index_id)][counter->partition_id];
hot_partition.qpd = 0;
hot_partition.qpw = std::max(hot_partition.qpw, counter->qpw());
hot_partition.last_access_time =
std::max<int64_t>(hot_partition.last_access_time,
std::chrono::duration_cast<std::chrono::seconds>(
counter->last_access_time.time_since_epoch())
.count());
}
}
});
constexpr int N = 50;
int return_partitions = 0;
const uint64_t day_tables_built = day_hot_partitions.size();
const uint64_t day_entries_built = count_hot_partition_entries(day_hot_partitions);
const uint64_t week_tables_built = week_hot_partitions.size();
const uint64_t week_entries_built = count_hot_partition_entries(week_hot_partitions);
uint64_t last_day_tables_after = 0;
uint64_t last_day_entries_after = 0;
uint64_t last_week_tables_after = 0;
uint64_t last_week_entries_after = 0;
{
std::unique_lock lock(_last_partitions_mtx);
get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
return_partitions, N);
get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
return_partitions, N);
_last_day_hot_partitions = std::move(day_hot_partitions);
_last_week_hot_partitions = std::move(week_hot_partitions);
last_day_tables_after = _last_day_hot_partitions.size();
last_day_entries_after = count_hot_partition_entries(_last_day_hot_partitions);
last_week_tables_after = _last_week_hot_partitions.size();
last_week_entries_after = count_hot_partition_entries(_last_week_hot_partitions);
}
const int64_t process_mem_after = process_memory_usage_for_diag();
LOG(INFO) << "tablet_hotspot_diag get_top_n_hot_partition call_id=" << call_id
<< " day_tables_built=" << day_tables_built
<< " day_entries_built=" << day_entries_built
<< " week_tables_built=" << week_tables_built
<< " week_entries_built=" << week_entries_built
<< " returned_partitions=" << return_partitions
<< " last_day_tables_before=" << last_day_tables_before
<< " last_day_entries_before=" << last_day_entries_before
<< " last_day_tables_after=" << last_day_tables_after
<< " last_day_entries_after=" << last_day_entries_after
<< " last_week_tables_before=" << last_week_tables_before
<< " last_week_entries_before=" << last_week_entries_before
<< " last_week_tables_after=" << last_week_tables_after
<< " last_week_entries_after=" << last_week_entries_after
<< " process_mem_before_mb=" << bytes_to_mb(process_mem_before)
<< " process_mem_after_mb=" << bytes_to_mb(process_mem_after);
}
void HotspotCounter::make_dot_point() {
uint64_t value = cur_counter.exchange(0, std::memory_order_acq_rel);
if (history_counters.size() == week_counters_size) {
uint64_t week_counter_remove = history_counters.back();
uint64_t day_counter_remove = history_counters[day_counters_size - 1];
week_history_counter = week_history_counter - week_counter_remove + value;
day_history_counter = day_history_counter - day_counter_remove + value;
history_counters.pop_back();
} else if (history_counters.size() < day_counters_size) {
week_history_counter += value;
day_history_counter += value;
} else {
week_history_counter += value;
uint64_t day_counter_remove = history_counters[day_counters_size - 1];
day_history_counter = day_history_counter - day_counter_remove + value;
}
history_counters.push_front(value);
}
uint64_t HotspotCounter::qpd() {
return day_history_counter + cur_counter.load();
}
uint64_t HotspotCounter::qpw() {
return week_history_counter + cur_counter.load();
}
bool TabletHotspot::is_gc_eligible(const HotspotCounter& counter, SystemTimePoint now) {
const auto week_window = std::chrono::seconds((HotspotCounter::week_counters_size + 1) *
HotspotCounter::time_interval);
return counter.last_access_time < now - week_window &&
counter.cur_counter.load(std::memory_order_relaxed) == 0 &&
counter.day_history_counter.load(std::memory_order_relaxed) == 0 &&
counter.week_history_counter.load(std::memory_order_relaxed) == 0;
}
TabletHotspot::MaintenanceStats TabletHotspot::run_maintenance_once(SystemTimePoint now) {
MaintenanceStats stats;
auto gc_elapsed = SteadyClock::duration::zero();
for (size_t slot_idx = 0; slot_idx < s_slot_size; ++slot_idx) {
auto& slot = _tablets_hotspot[slot_idx];
std::vector<HotspotCounterPtr> counters;
size_t slot_size_before = 0;
size_t slot_bucket_count_before = 0;
{
std::lock_guard lock(slot.mtx);
slot_size_before = slot.map.size();
slot_bucket_count_before = slot.map.bucket_count();
stats.total_counters_before_gc += slot_size_before;
if (slot_size_before > 0) {
++stats.non_empty_slots_before_gc;
}
stats.max_slot_size_before_gc =
std::max<uint64_t>(stats.max_slot_size_before_gc, slot_size_before);
stats.sum_bucket_count_before_gc += slot_bucket_count_before;
counters.reserve(slot_size_before);
for (auto& [_, counter] : slot.map) {
counters.push_back(counter);
}
}
stats.copied_counters += counters.size();
std::for_each(counters.begin(), counters.end(),
[](HotspotCounterPtr& counter) { counter->make_dot_point(); });
size_t erased_count = 0;
bool compacted = false;
size_t slot_size_after = 0;
size_t slot_bucket_count_after = 0;
auto gc_start = SteadyClock::now();
{
std::lock_guard lock(slot.mtx);
for (auto iter = slot.map.begin(); iter != slot.map.end();) {
if (is_gc_eligible(*iter->second, now)) {
iter = slot.map.erase(iter);
++erased_count;
} else {
++iter;
}
}
if (should_compact_slot(slot_size_before, slot.map.size(), slot.map.bucket_count(),
erased_count)) {
decltype(slot.map) compacted_map;
compacted_map.max_load_factor(slot.map.max_load_factor());
compacted_map.reserve(slot.map.size());
for (const auto& [tablet_id, counter] : slot.map) {
compacted_map.emplace(tablet_id, counter);
}
slot.map.swap(compacted_map);
compacted = true;
}
slot_size_after = slot.map.size();
slot_bucket_count_after = slot.map.bucket_count();
stats.total_counters_after_gc += slot_size_after;
if (slot_size_after > 0) {
++stats.non_empty_slots_after_gc;
}
stats.max_slot_size_after_gc =
std::max<uint64_t>(stats.max_slot_size_after_gc, slot_size_after);
stats.sum_bucket_count_after_gc += slot_bucket_count_after;
}
gc_elapsed += SteadyClock::now() - gc_start;
stats.evicted_counters += erased_count;
if (compacted) {
++stats.compacted_slots;
}
if (erased_count > 0 || compacted) {
LOG(INFO) << "tablet_hotspot_diag gc_slot"
<< " slot_idx=" << slot_idx << " slot_size_before=" << slot_size_before
<< " slot_size_after=" << slot_size_after
<< " bucket_count_before=" << slot_bucket_count_before
<< " bucket_count_after=" << slot_bucket_count_after
<< " erased_count=" << erased_count << " compacted=" << compacted;
}
}
stats.gc_elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(gc_elapsed).count();
return stats;
}
void TabletHotspot::make_dot_point() {
while (true) {
{
std::unique_lock lock(_mtx);
_cond.wait_for(lock, std::chrono::seconds(HotspotCounter::time_interval),
[this]() { return _closed; });
if (_closed) {
break;
}
}
const uint64_t round =
_make_dot_point_round_total.fetch_add(1, std::memory_order_relaxed) + 1;
const uint64_t count_calls_total = _count_calls_total.load(std::memory_order_relaxed);
const uint64_t existing_hit_total = _existing_hit_total.load(std::memory_order_relaxed);
const uint64_t new_counter_total_before =
_new_counter_total.load(std::memory_order_relaxed);
const int64_t process_mem_before = process_memory_usage_for_diag();
const auto now = std::chrono::system_clock::now();
const MaintenanceStats stats = run_maintenance_once(now);
const int64_t process_mem_after = process_memory_usage_for_diag();
const uint64_t new_counter_total_after = _new_counter_total.load(std::memory_order_relaxed);
const uint64_t prev_round_new_counter_total = _last_round_new_counter_total.exchange(
new_counter_total_after, std::memory_order_relaxed);
const uint64_t new_counter_delta =
new_counter_total_after >= prev_round_new_counter_total
? (new_counter_total_after - prev_round_new_counter_total)
: 0;
LOG(INFO) << "tablet_hotspot_diag make_dot_point round=" << round
<< " total_counters=" << stats.total_counters_before_gc
<< " total_counters_before_gc=" << stats.total_counters_before_gc
<< " total_counters_after_gc=" << stats.total_counters_after_gc
<< " non_empty_slots=" << stats.non_empty_slots_before_gc
<< " non_empty_slots_before_gc=" << stats.non_empty_slots_before_gc
<< " non_empty_slots_after_gc=" << stats.non_empty_slots_after_gc
<< " max_slot_size=" << stats.max_slot_size_before_gc
<< " max_slot_size_before_gc=" << stats.max_slot_size_before_gc
<< " max_slot_size_after_gc=" << stats.max_slot_size_after_gc
<< " sum_bucket_count=" << stats.sum_bucket_count_before_gc
<< " sum_bucket_count_before_gc=" << stats.sum_bucket_count_before_gc
<< " sum_bucket_count_after_gc=" << stats.sum_bucket_count_after_gc
<< " copied_counters=" << stats.copied_counters
<< " evicted_counters=" << stats.evicted_counters << " evicted_ratio="
<< ratio_or_zero(stats.evicted_counters, stats.total_counters_before_gc)
<< " compacted_slots=" << stats.compacted_slots << " bucket_reclaim_ratio="
<< ratio_or_zero(
stats.sum_bucket_count_before_gc >= stats.sum_bucket_count_after_gc
? (stats.sum_bucket_count_before_gc -
stats.sum_bucket_count_after_gc)
: 0,
stats.sum_bucket_count_before_gc)
<< " gc_elapsed_ms=" << stats.gc_elapsed_ms
<< " count_calls_total=" << count_calls_total
<< " existing_hit_total=" << existing_hit_total
<< " new_counter_total_before=" << new_counter_total_before
<< " new_counter_total_after=" << new_counter_total_after
<< " new_counter_delta=" << new_counter_delta
<< " process_mem_before_mb=" << bytes_to_mb(process_mem_before)
<< " process_mem_after_mb=" << bytes_to_mb(process_mem_after);
}
}
} // namespace doris