blob: 25f0b2327024fdc723e4735c9bba536a915b1149 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/schema_cloud_dictionary_cache.h"
#include <fmt/core.h>
#include <gen_cpp/olap_file.pb.h>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "common/config.h"
#include "gen_cpp/cloud.pb.h" // For GetSchemaDictResponse
#include "runtime/exec_env.h"
namespace doris {
bvar::Adder<int64_t> g_schema_dict_cache_count("schema_dict_cache_count");
bvar::Adder<int64_t> g_replace_dict_keys_to_schema_hit_cache(
"schema_dict_cache_replace_dict_keys_to_schema_hit_count");
bvar::Adder<int64_t> g_replace_schema_to_dict_keys_hit_cache(
"schema_dict_cache_replace_schema_to_dict_keys_hit_count");
bvar::Adder<int64_t> g_schema_dict_cache_miss_count("schema_dict_cache_miss_count");
bvar::Adder<int64_t> g_schema_dict_refresh_count("schema_dict_refresh_count");
void SchemaCloudDictionaryCache::_insert(int64_t index_id, const SchemaCloudDictionarySPtr& dict) {
auto* value = new CacheValue;
value->dict = dict;
auto* lru_handle =
LRUCachePolicy::insert(fmt::format("{}", index_id), value, 1, 0, CachePriority::NORMAL);
g_schema_dict_cache_count << 1;
_cache->release(lru_handle);
}
SchemaCloudDictionarySPtr SchemaCloudDictionaryCache::_lookup(int64_t index_id) {
Cache::Handle* handle = LRUCachePolicy::lookup(fmt::format("{}", index_id));
if (!handle) {
return nullptr;
}
auto* cache_val = static_cast<CacheValue*>(_cache->value(handle));
SchemaCloudDictionarySPtr dict = cache_val ? cache_val->dict : nullptr;
_cache->release(handle); // release handle but dict's shared_ptr still alive
return dict;
}
/**
* Processes dictionary entries by matching items from the given item map.
* It maps items to their dictionary keys, then adds these keys to the rowset metadata.
* If an item is missing in the dictionary, the dictionary key list in rowset meta is cleared
* and the function returns a NotFound status.
*
* @tparam ItemPB The protobuf message type for dictionary items (e.g., ColumnPB or TabletIndexPB).
* @param dict The SchemaCloudDictionary that holds the dictionary entries.
* @param item_dict A mapping from unique identifiers to the dictionary items.
* @param result Pointer to a repeated field where filtered (non-extended) items are stored. May be null.
* @param items The repeated field of items in the original rowset meta.
* @param filter A predicate that returns true if an item should be treated as an extended item and skipped.
* @param add_dict_key_fn A function to be called for each valid item that adds its key to the rowset meta.
* @param rowset_meta Pointer to the rowset metadata; it is cleared if any item is not found.
*
* @return Status::OK if all items are processed successfully; otherwise, a NotFound status.
*/
template <typename ItemPB>
Status process_dictionary(SchemaCloudDictionary& dict,
const google::protobuf::Map<int32_t, ItemPB>& item_dict,
google::protobuf::RepeatedPtrField<ItemPB>* result,
const google::protobuf::RepeatedPtrField<ItemPB>& items,
const std::function<bool(const ItemPB&)>& filter,
const std::function<void(int32_t)>& add_dict_key_fn,
RowsetMetaCloudPB* rowset_meta) {
if (items.empty()) {
return Status::OK();
}
// Use deterministic method to do serialization since structure like
// `google::protobuf::Map`'s serialization is unstable
auto serialize_fn = [](const ItemPB& item) -> std::string {
std::string output;
google::protobuf::io::StringOutputStream string_output_stream(&output);
google::protobuf::io::CodedOutputStream output_stream(&string_output_stream);
output_stream.SetSerializationDeterministic(true);
item.SerializeToCodedStream(&output_stream);
return output;
};
google::protobuf::RepeatedPtrField<ItemPB> none_ext_items;
std::unordered_map<std::string, int> reversed_dict;
for (const auto& [key, val] : item_dict) {
reversed_dict[serialize_fn(val)] = key;
}
for (const auto& item : items) {
if (filter(item)) {
// Filter none extended items, mainly extended columns and extended indexes
*none_ext_items.Add() = item;
continue;
}
const std::string serialized_key = serialize_fn(item);
auto it = reversed_dict.find(serialized_key);
if (it == reversed_dict.end()) {
// If any required item is missing in the dictionary, clear the dict key list and return NotFound.
// ATTN: need to clear dict key list let MS to add key list
rowset_meta->clear_schema_dict_key_list();
g_schema_dict_cache_miss_count << 1;
return Status::NotFound<false>("Not found entry in dict");
}
// Add existed dict key to related dict
add_dict_key_fn(it->second);
}
// clear extended items to prevent writing them to fdb
if (result != nullptr) {
result->Swap(&none_ext_items);
}
return Status::OK();
}
Status SchemaCloudDictionaryCache::replace_schema_to_dict_keys(int64_t index_id,
RowsetMetaCloudPB* rowset_meta) {
if (!rowset_meta->has_variant_type_in_schema()) {
return Status::OK();
}
auto dict = _lookup(index_id);
if (!dict) {
g_schema_dict_cache_miss_count << 1;
return Status::NotFound<false>("Not found dict {}", index_id);
}
auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
// Process column dictionary: add keys for non-extended columns.
auto column_filter = [&](const doris::ColumnPB& col) -> bool { return col.unique_id() >= 0; };
auto column_dict_adder = [&](int32_t key) { dict_list->add_column_dict_key_list(key); };
RETURN_IF_ERROR(process_dictionary<ColumnPB>(
*dict, dict->column_dict(), rowset_meta->mutable_tablet_schema()->mutable_column(),
rowset_meta->tablet_schema().column(), column_filter, column_dict_adder, rowset_meta));
// Process index dictionary: add keys for indexes with an empty index_suffix_name.
auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool {
return index_pb.index_suffix_name().empty();
};
auto index_dict_adder = [&](int32_t key) { dict_list->add_index_info_dict_key_list(key); };
RETURN_IF_ERROR(process_dictionary<doris::TabletIndexPB>(
*dict, dict->index_dict(), rowset_meta->mutable_tablet_schema()->mutable_index(),
rowset_meta->tablet_schema().index(), index_filter, index_dict_adder, rowset_meta));
g_replace_schema_to_dict_keys_hit_cache << 1;
return Status::OK();
}
Status SchemaCloudDictionaryCache::_try_fill_schema(
const std::shared_ptr<SchemaCloudDictionary>& dict, const SchemaDictKeyList& dict_keys,
TabletSchemaCloudPB* schema) {
// Process column dictionary keys
for (int key : dict_keys.column_dict_key_list()) {
auto it = dict->column_dict().find(key);
if (it == dict->column_dict().end()) {
return Status::NotFound<false>("Column dict key {} not found", key);
}
*schema->add_column() = it->second;
}
// Process index dictionary keys
for (int key : dict_keys.index_info_dict_key_list()) {
auto it = dict->index_dict().find(key);
if (it == dict->index_dict().end()) {
return Status::NotFound<false>("Index dict key {} not found", key);
}
*schema->add_index() = it->second;
}
return Status::OK();
}
Status SchemaCloudDictionaryCache::refresh_dict(int64_t index_id,
SchemaCloudDictionarySPtr* new_dict) {
// First attempt: use the current cached dictionary.
auto refresh_dict = std::make_shared<SchemaCloudDictionary>();
RETURN_IF_ERROR(static_cast<const CloudStorageEngine&>(ExecEnv::GetInstance()->storage_engine())
.meta_mgr()
.get_schema_dict(index_id, &refresh_dict));
_insert(index_id, refresh_dict);
if (new_dict != nullptr) {
*new_dict = refresh_dict;
}
LOG(INFO) << "refresh dict for index_id=" << index_id;
g_schema_dict_refresh_count << 1;
return Status::OK();
}
Status SchemaCloudDictionaryCache::replace_dict_keys_to_schema(int64_t index_id,
RowsetMetaCloudPB* out) {
// First attempt: use the current cached dictionary
SchemaCloudDictionarySPtr dict = _lookup(index_id);
Status st =
dict ? _try_fill_schema(dict, out->schema_dict_key_list(), out->mutable_tablet_schema())
: Status::NotFound<false>("Schema dict not found in cache");
// If filling fails (possibly due to outdated dictionary data), refresh the dictionary
if (!st.ok()) {
g_schema_dict_cache_miss_count << 1;
RETURN_IF_ERROR(refresh_dict(index_id, &dict));
if (!dict) {
return Status::NotFound<false>("Schema dict not found after refresh, index_id={}",
index_id);
}
// Retry filling the schema with the refreshed dictionary
st = _try_fill_schema(dict, out->schema_dict_key_list(), out->mutable_tablet_schema());
}
g_replace_dict_keys_to_schema_hit_cache << 1;
return st;
}
} // namespace doris