blob: a0f3b20142911a6335a6400cee01c99aab1b796e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_txn_delete_bitmap_cache.h"
#include <fmt/core.h>
#include <chrono>
#include <memory>
#include <shared_mutex>
#include "cloud/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
namespace doris {
CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes)
: LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, size_in_bytes,
LRUCacheType::SIZE, 86400, 4),
_stop_latch(1) {}
CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() {
_stop_latch.count_down();
_clean_thread->join();
}
Status CloudTxnDeleteBitmapCache::init() {
auto st = Thread::create(
"CloudTxnDeleteBitmapCache", "clean_txn_dbm_thread",
[this]() { this->_clean_thread_callback(); }, &_clean_thread);
if (!st.ok()) {
LOG(WARNING) << "failed to create thread for CloudTxnDeleteBitmapCache, error: " << st;
}
return st;
}
Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset,
DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
std::shared_ptr<PartialUpdateInfo>* partial_update_info,
std::shared_ptr<PublishStatus>* publish_status, TxnPublishInfo* previous_publish_info) {
{
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey key(transaction_id, tablet_id);
DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache.get_tablet_txn_info.not_found", {
return Status::Error<ErrorCode::NOT_FOUND>(
"not found txn info for test, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
});
auto iter = _txn_map.find(key);
if (iter == _txn_map.end()) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
"not found txn info, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
}
*rowset = iter->second.rowset;
*txn_expiration = iter->second.txn_expiration;
*partial_update_info = iter->second.partial_update_info;
*publish_status = iter->second.publish_status;
*previous_publish_info = iter->second.publish_info;
}
auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr);
if (st.is<ErrorCode::NOT_FOUND>()) {
// Because of the rowset_ids become empty, all delete bitmap
// will be recalculate in CalcDeleteBitmapTask
if (delete_bitmap != nullptr) {
*delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
}
// to avoid to skip calculating
**publish_status = PublishStatus::INIT;
return Status::OK();
}
return st;
}
Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap,
RowsetIdUnorderedSet* rowset_ids, std::shared_ptr<PublishStatus>* publish_status) {
if (publish_status) {
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
auto iter = _txn_map.find(txn_key);
if (iter == _txn_map.end()) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
"not found txn info, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
}
*publish_status = iter->second.publish_status;
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);
Cache::Handle* handle = lookup(key);
DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", {
handle = nullptr;
LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed "
"when get delete bitmap, txn_id:"
<< transaction_id << ", tablet_id: " << tablet_id;
});
DeleteBitmapCacheValue* val =
handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
if (val) {
*delete_bitmap = val->delete_bitmap;
if (rowset_ids) {
*rowset_ids = val->rowset_ids;
}
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
release(handle);
} else {
LOG_INFO("cache missed when get delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
return Status::Error<ErrorCode::NOT_FOUND, false>(
"cache missed when get delete bitmap, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
}
return Status::OK();
}
void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, int64_t txn_expiration,
std::shared_ptr<PartialUpdateInfo> partial_update_info) {
int64_t txn_expiration_min =
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count() +
config::tablet_txn_info_min_expired_seconds;
txn_expiration = std::max(txn_expiration_min, txn_expiration);
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
_txn_map[txn_key] = TxnVal(rowset, txn_expiration, std::move(partial_update_info),
std::move(publish_status));
_expiration_txn.emplace(txn_expiration, txn_key);
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);
auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
size_t charge = sizeof(DeleteBitmapCacheValue);
for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
charge += v.getSizeInBytes();
}
auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
release(handle);
LOG_INFO("set txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("expiration", txn_expiration)
.tag("tablet_id", tablet_id)
.tag("delete_bitmap_size", charge)
.tag("delete_bitmap_count", delete_bitmap->get_delete_bitmap_count())
.tag("delete_bitmap_cardinality", delete_bitmap->cardinality());
}
Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status,
TxnPublishInfo publish_info) {
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
if (!_txn_map.contains(txn_key)) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
"not found txn info, tablet_id={}, transaction_id={}, may be expired and be "
"removed",
tablet_id, transaction_id);
}
TxnVal& txn_val = _txn_map[txn_key];
*(txn_val.publish_status) = publish_status;
if (publish_status == PublishStatus::SUCCEED) {
txn_val.publish_info = publish_info;
}
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);
auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
size_t charge = sizeof(DeleteBitmapCacheValue);
for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
charge += v.getSizeInBytes();
}
auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
release(handle);
if (config::enable_mow_verbose_log) {
LOG_INFO("update txn related delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablt_id", tablet_id)
.tag("delete_bitmap_size", charge)
.tag("delete_bitmap_count", delete_bitmap->get_delete_bitmap_count())
.tag("delete_bitmap_cardinality", delete_bitmap->cardinality())
.tag("publish_status", static_cast<int>(publish_status));
}
return Status::OK();
}
void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
std::unique_lock<std::shared_mutex> wlock(_rwlock);
while (!_expiration_txn.empty()) {
auto iter = _expiration_txn.begin();
if (_txn_map.find(iter->second) == _txn_map.end()) {
_expiration_txn.erase(iter);
continue;
}
int64_t current_time = duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (iter->first > current_time) {
break;
}
auto txn_iter = _txn_map.find(iter->second);
if ((txn_iter != _txn_map.end()) && (iter->first == txn_iter->second.txn_expiration)) {
LOG_INFO("clean expired delete bitmap")
.tag("txn_id", txn_iter->first.txn_id)
.tag("expiration", txn_iter->second.txn_expiration)
.tag("tablt_id", txn_iter->first.tablet_id);
std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
std::to_string(txn_iter->first.tablet_id); // Cache key container
CacheKey cache_key(key_str);
erase(cache_key);
_txn_map.erase(iter->second);
}
_expiration_txn.erase(iter);
}
}
void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId transaction_id,
int64_t tablet_id) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
auto txn_iter = _txn_map.find(txn_key);
if (txn_iter != _txn_map.end()) {
LOG_INFO("remove unused tablet txn info")
.tag("txn_id", txn_iter->first.txn_id)
.tag("tablt_id", txn_iter->first.tablet_id);
std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
std::to_string(txn_iter->first.tablet_id); // Cache key container
CacheKey cache_key(key_str);
erase(cache_key);
_txn_map.erase(txn_key);
}
}
void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
do {
remove_expired_tablet_txn_info();
} while (!_stop_latch.wait_for(
std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
}
} // namespace doris