// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
// and modified by Doris

#include "io/cache/block_file_cache.h"

#include <cstdio>
#include <exception>
#include <fstream>
#include <unordered_set>

#include "common/status.h"
#include "cpp/sync_point.h"
#include "gen_cpp/file_cache.pb.h"
#include "runtime/exec_env.h"

#if defined(__APPLE__)
#include <sys/mount.h>
#else
#include <sys/statfs.h>
#endif

#include <chrono> // IWYU pragma: keep
#include <mutex>
#include <ranges>

#include "common/cast_set.h"
#include "common/config.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_ttl_mgr.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/mem_file_cache_storage.h"
#include "util/runtime_profile.h"
#include "util/stack_util.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/time.h"
#include "vec/common/sip_hash.h"
#include "vec/common/uint128.h"
namespace doris::io {
#include "common/compile_check_begin.h"

// Insert a block pointer into one shard while swallowing allocation failures.
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
    if (!block) {
        return false;
    }
    try {
        auto* raw_ptr = block.get();
        auto idx = shard_index(raw_ptr);
        auto& shard = _shards[idx];
        std::lock_guard lock(shard.mutex);
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
        if (inserted) {
            _size.fetch_add(1, std::memory_order_relaxed);
        }
        return inserted;
    } catch (const std::exception& e) {
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
    } catch (...) {
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
    }
    return false;
}

// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
    if (limit == 0 || output == nullptr) {
        return 0;
    }
    size_t drained = 0;
    try {
        output->reserve(output->size() + std::min(limit, size()));
        for (auto& shard : _shards) {
            if (drained >= limit) {
                break;
            }
            std::lock_guard lock(shard.mutex);
            auto it = shard.entries.begin();
            size_t shard_drained = 0;
            while (it != shard.entries.end() && drained + shard_drained < limit) {
                output->emplace_back(std::move(it->second));
                it = shard.entries.erase(it);
                ++shard_drained;
            }
            if (shard_drained > 0) {
                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
                drained += shard_drained;
            }
        }
    } catch (const std::exception& e) {
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
    } catch (...) {
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
    }
    return drained;
}

// Remove every pending block, guarding against unexpected exceptions.
void NeedUpdateLRUBlocks::clear() {
    try {
        for (auto& shard : _shards) {
            std::lock_guard lock(shard.mutex);
            if (!shard.entries.empty()) {
                auto removed = shard.entries.size();
                shard.entries.clear();
                _size.fetch_sub(removed, std::memory_order_relaxed);
            }
        }
    } catch (const std::exception& e) {
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
    } catch (...) {
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
    }
}

size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
    DCHECK(ptr != nullptr);
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
}

BlockFileCache::BlockFileCache(const std::string& cache_base_path,
                               const FileCacheSettings& cache_settings)
        : _cache_base_path(cache_base_path),
          _capacity(cache_settings.capacity),
          _max_file_block_size(cache_settings.max_file_block_size) {
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
                                                                     "file_cache_cache_size", 0);
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);

    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_total_evict_size");
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                                     "file_cache_total_read_size");
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                                    "file_cache_total_hit_size");
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                                    "file_cache_gc_evict_bytes");
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                                    "file_cache_gc_evict_count");

    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_disposable_to_normal");
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_disposable_to_index");
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_disposable_to_ttl");
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_normal_to_disposable");
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_normal_to_index");
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_normal_to_ttl");
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_index_to_disposable");
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_index_to_normal");
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_index_to_ttl");
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_ttl_to_disposable");
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_ttl_to_normal");
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_time_ttl_to_index");

    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_self_lru_disposable");
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_self_lru_normal");
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");

    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_disposable_to_normal");
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_disposable_to_index");
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_disposable_to_ttl");
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_normal_to_disposable");
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_normal_to_index");
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_normal_to_ttl");
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_index_to_disposable");
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_index_to_normal");
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_index_to_ttl");
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_ttl_to_disposable");
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_ttl_to_normal");
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                  "file_cache_evict_by_size_ttl_to_index");

    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");

    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                             "file_cache_num_read_blocks");
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                            "file_cache_num_hit_blocks");
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
                                                                "file_cache_num_removed_blocks");

    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");

#ifndef BE_TEST
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
            3600);
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
            _no_warmup_num_hit_blocks.get(), 300);
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
            _no_warmup_num_read_blocks.get(), 300);
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
            _no_warmup_num_hit_blocks.get(), 3600);
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
            _no_warmup_num_read_blocks.get(), 3600);
#endif

    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
                                                        "file_cache_hit_ratio", 0.0);
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
                                                           "file_cache_hit_ratio_5m", 0.0);
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
                                                           "file_cache_hit_ratio_1h", 0.0);

    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);

    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);

    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
                                                                 "file_cache_ttl_gc_latency_us");
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");

    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
                                 cache_settings.disposable_queue_elements, 60 * 60);
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
                            7 * 24 * 60 * 60);
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
                             24 * 60 * 60);
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
                          std::numeric_limits<int>::max());

    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
    if (cache_settings.storage == "memory") {
        _storage = std::make_unique<MemFileCacheStorage>();
        _cache_base_path = "memory";
    } else {
        _storage = std::make_unique<FSFileCacheStorage>();
    }

    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
}

UInt128Wrapper BlockFileCache::hash(const std::string& path) {
    uint128_t value;
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
    return UInt128Wrapper(value);
}

BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
    SCOPED_CACHE_LOCK(_mutex, this);
    if (!config::enable_file_cache_query_limit) {
        return {};
    }

    /// if enable_filesystem_query_cache_limit is true,
    /// we create context query for current query.
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
}

BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
    auto query_iter = _query_map.find(query_id);
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
}

void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
    SCOPED_CACHE_LOCK(_mutex, this);
    const auto& query_iter = _query_map.find(query_id);

    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
        _query_map.erase(query_iter);
    }
}

BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
        int file_cache_query_limit_percent) {
    if (query_id.lo == 0 && query_id.hi == 0) {
        return nullptr;
    }

    auto context = get_query_context(query_id, cache_lock);
    if (context) {
        return context;
    }

    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
    if (file_cache_query_limit_size < 268435456) {
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
                     << " bytes) is less than the 256MB recommended minimum. "
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
                     << " from its current value " << file_cache_query_limit_percent << "%.";
    }
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
    auto query_iter = _query_map.emplace(query_id, query_context).first;
    return query_iter->second;
}

void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
                                                   std::lock_guard<std::mutex>& cache_lock) {
    auto pair = std::make_pair(hash, offset);
    auto record = records.find(pair);
    DCHECK(record != records.end());
    auto iter = record->second;
    records.erase(pair);
    lru_queue.remove(iter, cache_lock);
}

void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
                                                    size_t size,
                                                    std::lock_guard<std::mutex>& cache_lock) {
    auto pair = std::make_pair(hash, offset);
    if (records.find(pair) == records.end()) {
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
        records.insert({pair, queue_iter});
    }
}

Status BlockFileCache::initialize() {
    SCOPED_CACHE_LOCK(_mutex, this);
    return initialize_unlocked(cache_lock);
}

Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
    DCHECK(!_is_initialized);
    _is_initialized = true;
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
        // requirements:
        // 1. restored data should not overwrite the last dump
        // 2. restore should happen before load and async load
        // 3. all queues should be restored sequencially to avoid conflict
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
        // to see if any improvement is a necessary
        restore_lru_queues_from_disk(cache_lock);
    }
    RETURN_IF_ERROR(_storage->init(this));

    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
        if (auto* meta_store = fs_storage->get_meta_store()) {
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
        }
    }

    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
    _cache_background_evict_in_advance_thread =
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
    _cache_background_block_lru_update_thread =
            std::thread(&BlockFileCache::run_background_block_lru_update, this);

    // Initialize LRU dump thread and restore queues
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
    _cache_background_lru_log_replay_thread =
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);

    return Status::OK();
}

void BlockFileCache::update_block_lru(FileBlockSPtr block,
                                      std::lock_guard<std::mutex>& cache_lock) {
    FileBlockCell* cell = block->cell;
    if (cell) {
        if (cell->queue_iterator) {
            auto& queue = get_queue(block->cache_type());
            queue.move_to_end(*cell->queue_iterator, cache_lock);
            _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
                                              block->_key.hash, block->_key.offset,
                                              block->_block_range.size());
        }
        cell->update_atime();
    }
}

void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
                              std::lock_guard<std::mutex>& cache_lock) {
    if (result) {
        result->push_back(cell.file_block);
    }

    auto& queue = get_queue(cell.file_block->cache_type());
    /// Move to the end of the queue. The iterator remains valid.
    if (cell.queue_iterator && move_iter_flag) {
        queue.move_to_end(*cell.queue_iterator, cache_lock);
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
                                          cell.file_block->_key.offset, cell.size());
    }

    cell.update_atime();
}

template <class T>
    requires IsXLock<T>
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
                                        T& /* cache_lock */) {
    auto it = _files.find(hash);
    if (it == _files.end()) {
        return nullptr;
    }

    auto& offsets = it->second;
    auto cell_it = offsets.find(offset);
    if (cell_it == offsets.end()) {
        return nullptr;
    }

    return &cell_it->second;
}

bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
}

FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
                                    const FileBlock::Range& range,
                                    std::lock_guard<std::mutex>& cache_lock) {
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
    auto it = _files.find(hash);
    if (it == _files.end()) {
        if (_async_open_done) {
            return {};
        }
        FileCacheKey key;
        key.hash = hash;
        key.meta.type = context.cache_type;
        key.meta.expiration_time = context.expiration_time;
        key.meta.tablet_id = context.tablet_id;
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);

        it = _files.find(hash);
        if (it == _files.end()) [[unlikely]] {
            return {};
        }
    }

    auto& file_blocks = it->second;
    if (file_blocks.empty()) {
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
                     << " cache type=" << context.cache_type
                     << " cache expiration time=" << context.expiration_time
                     << " cache range=" << range.left << " " << range.right
                     << " query id=" << context.query_id;
        DCHECK(false);
        _files.erase(hash);
        return {};
    }

    FileBlocks result;
    auto block_it = file_blocks.lower_bound(range.left);
    if (block_it == file_blocks.end()) {
        /// N - last cached block for given file hash, block{N}.offset < range.left:
        ///   block{N}                       block{N}
        /// [________                         [_______]
        ///     [__________]         OR                  [________]
        ///     ^                                        ^
        ///     range.left                               range.left

        const auto& cell = file_blocks.rbegin()->second;
        if (cell.file_block->range().right < range.left) {
            return {};
        }

        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
                 cache_lock);
    } else { /// block_it <-- segmment{k}
        if (block_it != file_blocks.begin()) {
            const auto& prev_cell = std::prev(block_it)->second;
            const auto& prev_cell_range = prev_cell.file_block->range();

            if (range.left <= prev_cell_range.right) {
                ///   block{k-1}  block{k}
                ///   [________]   [_____
                ///       [___________
                ///       ^
                ///       range.left

                use_cell(prev_cell, &result,
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
                         cache_lock);
            }
        }

        ///  block{k} ...       block{k-1}  block{k}                      block{k}
        ///  [______              [______]     [____                        [________
        ///  [_________     OR              [________      OR    [______]   ^
        ///  ^                              ^                           ^   block{k}.offset
        ///  range.left                     range.left                  range.right

        while (block_it != file_blocks.end()) {
            const auto& cell = block_it->second;
            if (range.right < cell.file_block->range().left) {
                break;
            }

            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
                     cache_lock);
            ++block_it;
        }
    }

    return result;
}

void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
    if (_need_update_lru_blocks.insert(std::move(block))) {
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
    }
}

std::string BlockFileCache::clear_file_cache_async() {
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
    _lru_dumper->remove_lru_dump_files();
    int64_t num_cells_all = 0;
    int64_t num_cells_to_delete = 0;
    int64_t num_cells_wait_recycle = 0;
    int64_t num_files_all = 0;
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
    {
        SCOPED_CACHE_LOCK(_mutex, this);

        std::vector<FileBlockCell*> deleting_cells;
        for (auto& [_, offset_to_cell] : _files) {
            ++num_files_all;
            for (auto& [_1, cell] : offset_to_cell) {
                ++num_cells_all;
                deleting_cells.push_back(&cell);
            }
        }

        // we cannot delete the element in the loop above, because it will break the iterator
        for (auto& cell : deleting_cells) {
            if (!cell->releasable()) {
                LOG(INFO) << "cell is not releasable, hash="
                          << " offset=" << cell->file_block->offset();
                cell->file_block->set_deleting();
                ++num_cells_wait_recycle;
                continue;
            }
            FileBlockSPtr file_block = cell->file_block;
            if (file_block) {
                std::lock_guard block_lock(file_block->_mutex);
                remove(file_block, cache_lock, block_lock, false);
                ++num_cells_to_delete;
            }
        }
        clear_need_update_lru_blocks();
    }

    std::stringstream ss;
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
       << " num_cells_to_delete=" << num_cells_to_delete
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
    auto msg = ss.str();
    LOG(INFO) << msg;
    _lru_dumper->remove_lru_dump_files();
    return msg;
}

FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
                                                  const CacheContext& context, size_t offset,
                                                  size_t size, FileBlock::State state,
                                                  std::lock_guard<std::mutex>& cache_lock) {
    DCHECK(size > 0);

    auto current_pos = offset;
    auto end_pos_non_included = offset + size;

    size_t current_size = 0;
    size_t remaining_size = size;

    FileBlocks file_blocks;
    while (current_pos < end_pos_non_included) {
        current_size = std::min(remaining_size, _max_file_block_size);
        remaining_size -= current_size;
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
                        ? state
                        : FileBlock::State::SKIP_CACHE;
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
            FileCacheKey key;
            key.hash = hash;
            key.offset = current_pos;
            key.meta.type = context.cache_type;
            key.meta.expiration_time = context.expiration_time;
            key.meta.tablet_id = context.tablet_id;
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
                                                          FileBlock::State::SKIP_CACHE);
            file_blocks.push_back(std::move(file_block));
        } else {
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
            if (cell) {
                file_blocks.push_back(cell->file_block);
                if (!context.is_cold_data) {
                    cell->update_atime();
                }
            }
            if (_ttl_mgr && context.tablet_id != 0) {
                _ttl_mgr->register_tablet_id(context.tablet_id);
            }
        }

        current_pos += current_size;
    }

    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
    return file_blocks;
}

void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
                                                       const UInt128Wrapper& hash,
                                                       const CacheContext& context,
                                                       const FileBlock::Range& range,
                                                       std::lock_guard<std::mutex>& cache_lock) {
    /// There are blocks [block1, ..., blockN]
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
    /// intersect with given range.

    /// It can have holes:
    /// [____________________]         -- requested range
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
    ///
    /// For each such hole create a cell with file block state EMPTY.

    auto it = file_blocks.begin();
    auto block_range = (*it)->range();

    size_t current_pos = 0;
    if (block_range.left < range.left) {
        ///    [_______     -- requested range
        /// [_______
        /// ^
        /// block1

        current_pos = block_range.right + 1;
        ++it;
    } else {
        current_pos = range.left;
    }

    while (current_pos <= range.right && it != file_blocks.end()) {
        block_range = (*it)->range();

        if (current_pos == block_range.left) {
            current_pos = block_range.right + 1;
            ++it;
            continue;
        }

        DCHECK(current_pos < block_range.left);

        auto hole_size = block_range.left - current_pos;

        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
                                                      FileBlock::State::EMPTY, cache_lock));

        current_pos = block_range.right + 1;
        ++it;
    }

    if (current_pos <= range.right) {
        ///   ________]     -- requested range
        ///   _____]
        ///        ^
        /// blockN

        auto hole_size = range.right - current_pos + 1;

        file_blocks.splice(file_blocks.end(),
                           split_range_into_cells(hash, context, current_pos, hole_size,
                                                  FileBlock::State::EMPTY, cache_lock));
    }
}

FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
                                            CacheContext& context) {
    FileBlock::Range range(offset, offset + size - 1);

    ReadStatistics* stats = context.stats;
    DCHECK(stats != nullptr);
    MonotonicStopWatch sw;
    sw.start();
    std::lock_guard cache_lock(_mutex);
    stats->lock_wait_timer += sw.elapsed_time();
    FileBlocks file_blocks;
    int64_t duration = 0;
    {
        SCOPED_RAW_TIMER(&duration);
        /// Get all blocks which intersect with the given range.
        {
            SCOPED_RAW_TIMER(&stats->get_timer);
            file_blocks = get_impl(hash, context, range, cache_lock);
        }

        if (file_blocks.empty()) {
            SCOPED_RAW_TIMER(&stats->set_timer);
            file_blocks = split_range_into_cells(hash, context, offset, size,
                                                 FileBlock::State::EMPTY, cache_lock);
        } else {
            SCOPED_RAW_TIMER(&stats->set_timer);
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
        }
        DCHECK(!file_blocks.empty());
        *_num_read_blocks << file_blocks.size();
        if (!context.is_warmup) {
            *_no_warmup_num_read_blocks << file_blocks.size();
        }
        for (auto& block : file_blocks) {
            size_t block_size = block->range().size();
            *_total_read_size_metrics << block_size;
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
                *_num_hit_blocks << 1;
                *_total_hit_size_metrics << block_size;
                if (!context.is_warmup) {
                    *_no_warmup_num_hit_blocks << 1;
                }
            }
        }
    }
    *_get_or_set_latency_us << (duration / 1000);
    return FileBlocksHolder(std::move(file_blocks));
}

FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
                                        size_t offset, size_t size, FileBlock::State state,
                                        std::lock_guard<std::mutex>& cache_lock) {
    /// Create a file block cell and put it in `files` map by [hash][offset].
    if (size == 0) {
        return nullptr; /// Empty files are not cached.
    }

    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
               << " expiration_time=" << context.expiration_time
               << " tablet_id=" << context.tablet_id;

    if (size > 1024 * 1024 * 1024) {
        LOG(WARNING) << "File block size is too large for a block. size=" << size
                     << " hash=" << hash.to_string() << " offset=" << offset
                     << " stack:" << get_stack_trace();
    }

    auto& offsets = _files[hash];
    auto itr = offsets.find(offset);
    if (itr != offsets.end()) {
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
                   << ", offset: " << offset << ", size: " << size
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
        return &(itr->second);
    }

    FileCacheKey key;
    key.hash = hash;
    key.offset = offset;
    key.meta.type = context.cache_type;
    key.meta.expiration_time = context.expiration_time;
    key.meta.tablet_id = context.tablet_id;
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
    Status st;
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
    }
    if (!st.ok()) {
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
                     << " cache_type=" << cache_type_to_string(context.cache_type)
                     << " error=" << st.msg();
    }

    auto& queue = get_queue(cell.file_block->cache_type());
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
                                      cell.size());

    if (cell.file_block->cache_type() == FileCacheType::TTL) {
        _cur_ttl_size += cell.size();
    }
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
    _cur_cache_size += size;
    return &(it->second);
}

size_t BlockFileCache::try_release() {
    SCOPED_CACHE_LOCK(_mutex, this);
    std::vector<FileBlockCell*> trash;
    for (auto& [hash, blocks] : _files) {
        for (auto& [offset, cell] : blocks) {
            if (cell.releasable()) {
                trash.emplace_back(&cell);
            } else {
                cell.file_block->set_deleting();
            }
        }
    }
    size_t remove_size = 0;
    for (auto& cell : trash) {
        FileBlockSPtr file_block = cell->file_block;
        std::lock_guard lc(cell->file_block->_mutex);
        remove_size += file_block->range().size();
        remove(file_block, cache_lock, lc);
        VLOG_DEBUG << "try_release " << _cache_base_path
                   << " hash=" << file_block->get_hash_value().to_string()
                   << " offset=" << file_block->offset();
    }
    *_evict_by_try_release << remove_size;
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
    return trash.size();
}

LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
    switch (type) {
    case FileCacheType::INDEX:
        return _index_queue;
    case FileCacheType::DISPOSABLE:
        return _disposable_queue;
    case FileCacheType::NORMAL:
        return _normal_queue;
    case FileCacheType::TTL:
        return _ttl_queue;
    default:
        DCHECK(false);
    }
    return _normal_queue;
}

const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
    switch (type) {
    case FileCacheType::INDEX:
        return _index_queue;
    case FileCacheType::DISPOSABLE:
        return _disposable_queue;
    case FileCacheType::NORMAL:
        return _normal_queue;
    case FileCacheType::TTL:
        return _ttl_queue;
    default:
        DCHECK(false);
    }
    return _normal_queue;
}

void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
                                        std::string& reason) {
    auto remove_file_block_if = [&](FileBlockCell* cell) {
        FileBlockSPtr file_block = cell->file_block;
        if (file_block) {
            std::lock_guard block_lock(file_block->_mutex);
            remove(file_block, cache_lock, block_lock, sync);
            VLOG_DEBUG << "remove_file_blocks"
                       << " hash=" << file_block->get_hash_value().to_string()
                       << " offset=" << file_block->offset() << " reason=" << reason;
        }
    };
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}

void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
                                           size_t& removed_size,
                                           std::vector<FileBlockCell*>& to_evict,
                                           std::lock_guard<std::mutex>& cache_lock,
                                           size_t& cur_removed_size, bool evict_in_advance) {
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
            break;
        }
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);

        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
                     << ", offset: " << entry_offset;

        size_t cell_size = cell->size();
        DCHECK(entry_size == cell_size);

        if (cell->releasable()) {
            auto& file_block = cell->file_block;

            std::lock_guard block_lock(file_block->_mutex);
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
            to_evict.push_back(cell);
            removed_size += cell_size;
            cur_removed_size += cell_size;
        }
    }
}

// 1. if async load file cache not finish
//     a. evict from lru queue
// 2. if ttl cache
//     a. evict from disposable/normal/index queue one by one
// 3. if dont reach query limit or dont have query limit
//     a. evict from other queue
//     b. evict from current queue
//         a.1 if the data belong write, then just evict cold data
// 4. if reach query limit
//     a. evict from query queue
//     b. evict from other queue
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
                                 size_t offset, size_t size,
                                 std::lock_guard<std::mutex>& cache_lock) {
    if (!_async_open_done) {
        return try_reserve_during_async_load(size, cache_lock);
    }
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
    // directly eliminate 5 times the size of the space
    if (_disk_resource_limit_mode) {
        size = 5 * size;
    }

    auto query_context = config::enable_file_cache_query_limit &&
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
                                 ? get_query_context(context.query_id, cache_lock)
                                 : nullptr;
    if (!query_context) {
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
    } else if (query_context->get_cache_size(cache_lock) + size <=
               query_context->get_max_cache_size()) {
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
    }
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
                               std::chrono::steady_clock::now().time_since_epoch())
                               .count();
    auto& queue = get_queue(context.cache_type);
    size_t removed_size = 0;
    size_t ghost_remove_size = 0;
    size_t queue_size = queue.get_capacity(cache_lock);
    size_t cur_cache_size = _cur_cache_size;
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);

    std::vector<LRUQueue::Iterator> ghost;
    std::vector<FileBlockCell*> to_evict;

    size_t max_size = queue.get_max_size();
    auto is_overflow = [&] {
        return _disk_resource_limit_mode ? removed_size < size
                                         : cur_cache_size + size - removed_size > _capacity ||
                                                   (queue_size + size - removed_size > max_size) ||
                                                   (query_context_cache_size + size -
                                                            (removed_size + ghost_remove_size) >
                                                    query_context->get_max_cache_size());
    };

    /// Select the cache from the LRU queue held by query for expulsion.
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
        if (!is_overflow()) {
            break;
        }

        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);

        if (!cell) {
            /// The cache corresponding to this record may be swapped out by
            /// other queries, so it has become invalid.
            ghost.push_back(iter);
            ghost_remove_size += iter->size;
        } else {
            size_t cell_size = cell->size();
            DCHECK(iter->size == cell_size);

            if (cell->releasable()) {
                auto& file_block = cell->file_block;

                std::lock_guard block_lock(file_block->_mutex);
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
                to_evict.push_back(cell);
                removed_size += cell_size;
            }
        }
    }

    auto remove_file_block_if = [&](FileBlockCell* cell) {
        FileBlockSPtr file_block = cell->file_block;
        if (file_block) {
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
            std::lock_guard block_lock(file_block->_mutex);
            remove(file_block, cache_lock, block_lock);
        }
    };

    for (auto& iter : ghost) {
        query_context->remove(iter->hash, iter->offset, cache_lock);
    }

    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);

    if (is_overflow() &&
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
        return false;
    }
    query_context->reserve(hash, offset, size, cache_lock);
    return true;
}

void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
    UInt128Wrapper hash = UInt128Wrapper();
    size_t offset = 0;
    CacheContext context;
    /* we pick NORMAL and TTL cache to evict in advance
     * we reserve for them but won't acutually give space to them
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
     */
    context.cache_type = FileCacheType::NORMAL;
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
    context.cache_type = FileCacheType::TTL;
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
}

// remove specific cache synchronously, for critical operations
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
    std::string reason = "remove_if_cached";
    SCOPED_CACHE_LOCK(_mutex, this);
    auto iter = _files.find(file_key);
    std::vector<FileBlockCell*> to_remove;
    if (iter != _files.end()) {
        for (auto& [_, cell] : iter->second) {
            if (cell.releasable()) {
                to_remove.push_back(&cell);
            } else {
                cell.file_block->set_deleting();
            }
        }
    }
    remove_file_blocks(to_remove, cache_lock, true, reason);
}

// the async version of remove_if_cached, for background operations
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
    std::string reason = "remove_if_cached_async";
    SCOPED_CACHE_LOCK(_mutex, this);

    auto iter = _files.find(file_key);
    std::vector<FileBlockCell*> to_remove;
    if (iter != _files.end()) {
        for (auto& [_, cell] : iter->second) {
            *_gc_evict_bytes_metrics << cell.size();
            *_gc_evict_count_metrics << 1;
            if (cell.releasable()) {
                to_remove.push_back(&cell);
            } else {
                cell.file_block->set_deleting();
            }
        }
    }
    remove_file_blocks(to_remove, cache_lock, false, reason);
}

std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
        FileCacheType cur_cache_type) {
    switch (cur_cache_type) {
    case FileCacheType::TTL:
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
    case FileCacheType::INDEX:
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
    case FileCacheType::NORMAL:
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
    case FileCacheType::DISPOSABLE:
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
    default:
        return {};
    }
    return {};
}

std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
    switch (cur_cache_type) {
    case FileCacheType::TTL:
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
    case FileCacheType::INDEX:
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
    case FileCacheType::NORMAL:
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
    case FileCacheType::DISPOSABLE:
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
    default:
        return {};
    }
    return {};
}

void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
    DCHECK(_files.find(hash) != _files.end() &&
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
    DCHECK(cell != nullptr);
    if (cell == nullptr) {
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
                     << " new_size=" << new_size;
        return;
    }
    if (cell->queue_iterator) {
        auto& queue = get_queue(cell->file_block->cache_type());
        DCHECK(queue.contains(hash, offset, cache_lock));
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
                                          cell->file_block->get_hash_value(),
                                          cell->file_block->offset(), new_size);
    }
    _cur_cache_size -= old_size;
    _cur_cache_size += new_size;
}

bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
    size_t removed_size = 0;
    size_t cur_cache_size = _cur_cache_size;
    std::vector<FileBlockCell*> to_evict;
    for (FileCacheType cache_type : other_cache_types) {
        auto& queue = get_queue(cache_type);
        size_t remove_size_per_type = 0;
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
                break;
            }
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
                         << ", offset: " << entry_offset;

            size_t cell_size = cell->size();
            DCHECK(entry_size == cell_size);

            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
                break;
            }

            if (cell->releasable()) {
                auto& file_block = cell->file_block;
                std::lock_guard block_lock(file_block->_mutex);
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
                to_evict.push_back(cell);
                removed_size += cell_size;
                remove_size_per_type += cell_size;
            }
        }
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
    }
    bool is_sync_removal = !evict_in_advance;
    std::string reason = std::string("try_reserve_by_time ") +
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);

    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
}

bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
                                 bool evict_in_advance) const {
    bool ret = false;
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
        ret = (removed_size < need_size);
        return ret;
    }
    if (_disk_resource_limit_mode) {
        ret = (removed_size < need_size);
    } else {
        ret = (cur_cache_size + need_size - removed_size > _capacity);
    }
    return ret;
}

bool BlockFileCache::try_reserve_from_other_queue_by_size(
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
    size_t removed_size = 0;
    size_t cur_cache_size = _cur_cache_size;
    std::vector<FileBlockCell*> to_evict;
    // we follow the privilege defined in get_other_cache_types to evict
    for (FileCacheType cache_type : other_cache_types) {
        auto& queue = get_queue(cache_type);

        // we will not drain each of them to the bottom -- i.e., we only
        // evict what they have stolen.
        size_t cur_queue_size = queue.get_capacity(cache_lock);
        size_t cur_queue_max_size = queue.get_max_size();
        if (cur_queue_size <= cur_queue_max_size) {
            continue;
        }
        size_t cur_removed_size = 0;
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
                              cur_removed_size, evict_in_advance);
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
    }
    bool is_sync_removal = !evict_in_advance;
    std::string reason = std::string("try_reserve_by_size") +
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
}

bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
                                                  int64_t cur_time,
                                                  std::lock_guard<std::mutex>& cache_lock,
                                                  bool evict_in_advance) {
    // currently, TTL cache is not considered as a candidate
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
        return reserve_success;
    }

    other_cache_types = get_other_cache_type(cur_cache_type);
    auto& cur_queue = get_queue(cur_cache_type);
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
    size_t cur_queue_max_size = cur_queue.get_max_size();
    // Hit the soft limit by self, cannot remove from other queues
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
        return false;
    }
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
                                                evict_in_advance);
}

bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
                                         QueryFileCacheContextPtr query_context,
                                         const CacheContext& context, size_t offset, size_t size,
                                         std::lock_guard<std::mutex>& cache_lock,
                                         bool evict_in_advance) {
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
                               std::chrono::steady_clock::now().time_since_epoch())
                               .count();
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
                                      evict_in_advance)) {
        auto& queue = get_queue(context.cache_type);
        size_t removed_size = 0;
        size_t cur_cache_size = _cur_cache_size;

        std::vector<FileBlockCell*> to_evict;
        size_t cur_removed_size = 0;
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
                              cur_removed_size, evict_in_advance);
        bool is_sync_removal = !evict_in_advance;
        std::string reason = std::string("try_reserve for cache type ") +
                             cache_type_to_string(context.cache_type) +
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;

        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
            return false;
        }
    }

    if (query_context) {
        query_context->reserve(hash, offset, size, cache_lock);
    }
    return true;
}

template <class T, class U>
    requires IsXLock<T> && IsXLock<U>
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
    auto hash = file_block->get_hash_value();
    auto offset = file_block->offset();
    auto type = file_block->cache_type();
    auto expiration_time = file_block->expiration_time();
    auto tablet_id = file_block->tablet_id();
    auto* cell = get_cell(hash, offset, cache_lock);
    file_block->cell = nullptr;
    DCHECK(cell);
    DCHECK(cell->queue_iterator);
    if (cell->queue_iterator) {
        auto& queue = get_queue(file_block->cache_type());
        queue.remove(*cell->queue_iterator, cache_lock);
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
                                          cell->file_block->get_hash_value(),
                                          cell->file_block->offset(), cell->size());
    }
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
            << file_block->range().size();
    *_total_evict_size_metrics << file_block->range().size();

    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
               << ", offset: " << offset << ", size: " << file_block->range().size()
               << ", type: " << cache_type_to_string(type);

    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
        FileCacheKey key;
        key.hash = hash;
        key.offset = offset;
        key.meta.type = type;
        key.meta.expiration_time = expiration_time;
        key.meta.tablet_id = tablet_id;
        if (sync) {
            int64_t duration_ns = 0;
            Status st;
            {
                SCOPED_RAW_TIMER(&duration_ns);
                st = _storage->remove(key);
            }
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
            if (!st.ok()) {
                LOG_WARNING("").error(st);
            }
        } else {
            // the file will be deleted in the bottom half
            // so there will be a window that the file is not in the cache but still in the storage
            // but it's ok, because the rowset is stale already
            bool ret = _recycle_keys.enqueue(key);
            if (ret) [[likely]] {
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
            } else {
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
                int64_t duration_ns = 0;
                Status st;
                {
                    SCOPED_RAW_TIMER(&duration_ns);
                    st = _storage->remove(key);
                }
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
                if (!st.ok()) {
                    LOG_WARNING("").error(st);
                }
            }
        }
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
        file_block->set_deleting();
        return;
    }
    _cur_cache_size -= file_block->range().size();
    if (FileCacheType::TTL == type) {
        _cur_ttl_size -= file_block->range().size();
    }
    auto it = _files.find(hash);
    if (it != _files.end()) {
        it->second.erase(file_block->offset());
        if (it->second.empty()) {
            _files.erase(hash);
        }
    }
    *_num_removed_blocks << 1;
}

size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
    SCOPED_CACHE_LOCK(_mutex, this);
    return get_used_cache_size_unlocked(cache_type, cache_lock);
}

size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
                                                    std::lock_guard<std::mutex>& cache_lock) const {
    return get_queue(cache_type).get_capacity(cache_lock);
}

size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
    SCOPED_CACHE_LOCK(_mutex, this);
    return get_available_cache_size_unlocked(cache_type, cache_lock);
}

size_t BlockFileCache::get_available_cache_size_unlocked(
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
    return get_queue(cache_type).get_max_element_size() -
           get_used_cache_size_unlocked(cache_type, cache_lock);
}

size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
    SCOPED_CACHE_LOCK(_mutex, this);
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
}

size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
                                                    std::lock_guard<std::mutex>& cache_lock) const {
    return get_queue(cache_type).get_elements_num(cache_lock);
}

FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
        : file_block(file_block) {
    file_block->cell = this;
    /**
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
     * successful getOrSetDownaloder call.
     */

    switch (file_block->_download_state) {
    case FileBlock::State::DOWNLOADED:
    case FileBlock::State::EMPTY:
    case FileBlock::State::SKIP_CACHE: {
        break;
    }
    default:
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
                      << FileBlock::state_to_string(file_block->_download_state);
    }
    if (file_block->cache_type() == FileCacheType::TTL) {
        update_atime();
    }
}

LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
    cache_size += size;
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
    return iter;
}

void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
    queue.clear();
    map.clear();
    cache_size = 0;
}

void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
    queue.splice(queue.end(), queue, queue_it);
}

void LRUQueue::resize(Iterator queue_it, size_t new_size,
                      std::lock_guard<std::mutex>& /* cache_lock */) {
    cache_size -= queue_it->size;
    queue_it->size = new_size;
    cache_size += new_size;
}
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
    return map.find(std::make_pair(hash, offset)) != map.end();
}

LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
    auto itr = map.find(std::make_pair(hash, offset));
    if (itr != map.end()) {
        return itr->second;
    }
    return std::list<FileKeyAndOffset>::iterator();
}

std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
    std::string result;
    for (const auto& [hash, offset, size] : queue) {
        if (!result.empty()) {
            result += ", ";
        }
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
    }
    return result;
}

size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
                                           std::lock_guard<std::mutex>& cache_lock) {
    std::list<FileKeyAndOffset> target_queue = this->queue;
    std::list<FileKeyAndOffset> base_queue = base.queue;
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());

    size_t m = vec1.size();
    size_t n = vec2.size();

    // Create a 2D vector (matrix) to store the Levenshtein distances
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));

    // Initialize the first row and column of the matrix
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
    for (size_t i = 0; i <= m; ++i) {
        dp[i][0] = i;
    }
    for (size_t j = 0; j <= n; ++j) {
        dp[0][j] = j;
    }

    // Fill the matrix using dynamic programming
    for (size_t i = 1; i <= m; ++i) {
        for (size_t j = 1; j <= n; ++j) {
            // Check if the current elements of both vectors are equal
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
                           vec1[i - 1].offset == vec2[j - 1].offset)
                                  ? 0
                                  : 1;
            // Calculate the minimum cost of three possible operations:
            // 1. Insertion: dp[i][j-1] + 1
            // 2. Deletion: dp[i-1][j] + 1
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
        }
    }
    // The bottom-right cell of the matrix contains the Levenshtein distance
    return dp[m][n];
}

std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
    SCOPED_CACHE_LOCK(_mutex, this);
    return dump_structure_unlocked(hash, cache_lock);
}

std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
                                                    std::lock_guard<std::mutex>&) {
    std::stringstream result;
    auto it = _files.find(hash);
    if (it == _files.end()) {
        return std::string("");
    }
    const auto& cells_by_offset = it->second;

    for (const auto& [_, cell] : cells_by_offset) {
        result << cell.file_block->get_info_for_log() << " "
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
    }

    return result.str();
}

std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
    SCOPED_CACHE_LOCK(_mutex, this);
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
}

std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
                                                            size_t offset,
                                                            std::lock_guard<std::mutex>&) {
    std::stringstream result;
    auto it = _files.find(hash);
    if (it == _files.end()) {
        return std::string("");
    }
    const auto& cells_by_offset = it->second;
    const auto& cell = cells_by_offset.find(offset);

    return cache_type_to_string(cell->second.file_block->cache_type());
}

void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
                                       FileCacheType new_type,
                                       std::lock_guard<std::mutex>& cache_lock) {
    if (auto iter = _files.find(hash); iter != _files.end()) {
        auto& file_blocks = iter->second;
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
            FileBlockCell& cell = cell_it->second;
            auto& cur_queue = get_queue(cell.file_block->cache_type());
            DCHECK(cell.queue_iterator.has_value());
            cur_queue.remove(*cell.queue_iterator, cache_lock);
            _lru_recorder->record_queue_event(
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
            auto& new_queue = get_queue(new_type);
            cell.queue_iterator =
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
                                              cell.file_block->get_hash_value(),
                                              cell.file_block->offset(), cell.size());
        }
    }
}

// @brief: get a path's disk capacity used percent, inode used percent
// @param: path
// @param: percent.first disk used percent, percent.second inode used percent
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
    struct statfs stat;
    int ret = statfs(path.c_str(), &stat);
    if (ret != 0) {
        return ret;
    }
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
    // v->used = stat.f_blocks - stat.f_bfree
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));

    unsigned long long inode_free = stat.f_ffree;
    unsigned long long inode_total = stat.f_files;
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
    percent->first = capacity_percentage;
    percent->second = 100 - inode_percentage;

    // Add sync point for testing
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);

    return 0;
}

std::string BlockFileCache::reset_capacity(size_t new_capacity) {
    using namespace std::chrono;
    int64_t space_released = 0;
    size_t old_capacity = 0;
    std::stringstream ss;
    ss << "finish reset_capacity, path=" << _cache_base_path;
    auto adjust_start_time = steady_clock::time_point();
    {
        SCOPED_CACHE_LOCK(_mutex, this);
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
            int64_t need_remove_size = _cur_cache_size - new_capacity;
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
                int64_t queue_released = 0;
                std::vector<FileBlockCell*> to_evict;
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
                    if (need_remove_size <= 0) {
                        break;
                    }
                    need_remove_size -= entry_size;
                    space_released += entry_size;
                    queue_released += entry_size;
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
                    if (!cell->releasable()) {
                        cell->file_block->set_deleting();
                        continue;
                    }
                    to_evict.push_back(cell);
                }
                for (auto& cell : to_evict) {
                    FileBlockSPtr file_block = cell->file_block;
                    std::lock_guard block_lock(file_block->_mutex);
                    remove(file_block, cache_lock, block_lock);
                }
                return queue_released;
            };
            int64_t queue_released = remove_blocks(_disposable_queue);
            ss << " disposable_queue released " << queue_released;
            queue_released = remove_blocks(_normal_queue);
            ss << " normal_queue released " << queue_released;
            queue_released = remove_blocks(_index_queue);
            ss << " index_queue released " << queue_released;
            queue_released = remove_blocks(_ttl_queue);
            ss << " ttl_queue released " << queue_released;

            _disk_resource_limit_mode = true;
            _disk_limit_mode_metrics->set_value(1);
            ss << " total_space_released=" << space_released;
        }
        old_capacity = _capacity;
        _capacity = new_capacity;
        _cache_capacity_metrics->set_value(_capacity);
    }
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
              << " use_time=" << cast_set<int64_t>(use_time.count());
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
    LOG(INFO) << ss.str();
    return ss.str();
}

void BlockFileCache::check_disk_resource_limit() {
    if (_storage->get_type() != FileCacheStorageType::DISK) {
        return;
    }

    bool previous_mode = _disk_resource_limit_mode;
    if (_capacity > _cur_cache_size) {
        _disk_resource_limit_mode = false;
        _disk_limit_mode_metrics->set_value(0);
    }
    std::pair<int, int> percent;
    int ret = disk_used_percentage(_cache_base_path, &percent);
    if (ret != 0) {
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
        return;
    }
    auto [space_percentage, inode_percentage] = percent;
    auto is_insufficient = [](const int& percentage) {
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
    };
    DCHECK_GE(space_percentage, 0);
    DCHECK_LE(space_percentage, 100);
    DCHECK_GE(inode_percentage, 0);
    DCHECK_LE(inode_percentage, 100);
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
    // FIXME: reject with config validator
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
        config::file_cache_exit_disk_resource_limit_mode_percent) {
        LOG_WARNING("config error, set to default value")
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
    }
    bool is_space_insufficient = is_insufficient(space_percentage);
    bool is_inode_insufficient = is_insufficient(inode_percentage);
    if (is_space_insufficient || is_inode_insufficient) {
        _disk_resource_limit_mode = true;
        _disk_limit_mode_metrics->set_value(1);
    } else if (_disk_resource_limit_mode &&
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
        _disk_resource_limit_mode = false;
        _disk_limit_mode_metrics->set_value(0);
    }
    if (previous_mode != _disk_resource_limit_mode) {
        // add log for disk resource limit mode switching
        if (_disk_resource_limit_mode) {
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
                         << " space_percent=" << space_percentage
                         << " inode_percent=" << inode_percentage
                         << " is_space_insufficient=" << is_space_insufficient
                         << " is_inode_insufficient=" << is_inode_insufficient
                         << " enter threshold="
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
        } else {
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
                      << " space_percent=" << space_percentage
                      << " inode_percent=" << inode_percentage << " exit threshold="
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
        }
    } else if (_disk_resource_limit_mode) {
        // print log for disk resource limit mode running, but less frequently
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
                                 << " space_percent=" << space_percentage
                                 << " inode_percent=" << inode_percentage
                                 << " is_space_insufficient=" << is_space_insufficient
                                 << " is_inode_insufficient=" << is_inode_insufficient
                                 << " mode run in resource limit";
    }
}

void BlockFileCache::check_need_evict_cache_in_advance() {
    if (_storage->get_type() != FileCacheStorageType::DISK) {
        return;
    }

    std::pair<int, int> percent;
    int ret = disk_used_percentage(_cache_base_path, &percent);
    if (ret != 0) {
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
        return;
    }
    auto [space_percentage, inode_percentage] = percent;
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
    auto is_insufficient = [](const int& percentage) {
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
    };
    DCHECK_GE(space_percentage, 0);
    DCHECK_LE(space_percentage, 100);
    DCHECK_GE(inode_percentage, 0);
    DCHECK_LE(inode_percentage, 100);
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
    // FIXME: reject with config validator
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
        LOG_WARNING("config error, set to default value")
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
    }
    bool previous_mode = _need_evict_cache_in_advance;
    bool is_space_insufficient = is_insufficient(space_percentage);
    bool is_inode_insufficient = is_insufficient(inode_percentage);
    bool is_size_insufficient = is_insufficient(size_percentage);
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
        _need_evict_cache_in_advance = true;
        _need_evict_cache_in_advance_metrics->set_value(1);
    } else if (_need_evict_cache_in_advance &&
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
        _need_evict_cache_in_advance = false;
        _need_evict_cache_in_advance_metrics->set_value(0);
    }
    if (previous_mode != _need_evict_cache_in_advance) {
        // add log for evict cache in advance mode switching
        if (_need_evict_cache_in_advance) {
            LOG(WARNING) << "Entering evict cache in advance mode: "
                         << "file_cache=" << get_base_path()
                         << " space_percent=" << space_percentage
                         << " inode_percent=" << inode_percentage
                         << " size_percent=" << size_percentage
                         << " is_space_insufficient=" << is_space_insufficient
                         << " is_inode_insufficient=" << is_inode_insufficient
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
        } else {
            LOG(INFO) << "Exiting evict cache in advance mode: "
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
                      << " inode_percent=" << inode_percentage
                      << " size_percent=" << size_percentage << " exit threshold="
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
        }
    } else if (_need_evict_cache_in_advance) {
        // print log for evict cache in advance mode running, but less frequently
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
                                 << " space_percent=" << space_percentage
                                 << " inode_percent=" << inode_percentage
                                 << " size_percent=" << size_percentage
                                 << " is_space_insufficient=" << is_space_insufficient
                                 << " is_inode_insufficient=" << is_inode_insufficient
                                 << " is_size_insufficient=" << is_size_insufficient
                                 << " need evict cache in advance";
    }
}

void BlockFileCache::run_background_monitor() {
    Thread::set_self_name("run_background_monitor");
    while (!_close) {
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
        check_disk_resource_limit();
        if (config::enable_evict_file_cache_in_advance) {
            check_need_evict_cache_in_advance();
        } else {
            _need_evict_cache_in_advance = false;
            _need_evict_cache_in_advance_metrics->set_value(0);
        }

        {
            std::unique_lock close_lock(_close_mtx);
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
            if (_close) {
                break;
            }
        }
        // report
        {
            SCOPED_CACHE_LOCK(_mutex, this);
            _cur_cache_size_metrics->set_value(_cur_cache_size);
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
                                                   _index_queue.get_capacity(cache_lock) -
                                                   _normal_queue.get_capacity(cache_lock) -
                                                   _disposable_queue.get_capacity(cache_lock));
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
                    _ttl_queue.get_capacity(cache_lock));
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
                    _ttl_queue.get_elements_num(cache_lock));
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
            _cur_normal_queue_element_count_metrics->set_value(
                    _normal_queue.get_elements_num(cache_lock));
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
            _cur_index_queue_element_count_metrics->set_value(
                    _index_queue.get_elements_num(cache_lock));
            _cur_disposable_queue_cache_size_metrics->set_value(
                    _disposable_queue.get_capacity(cache_lock));
            _cur_disposable_queue_element_count_metrics->set_value(
                    _disposable_queue.get_elements_num(cache_lock));

            // Update meta store write queue size if storage is FSFileCacheStorage
            if (_storage->get_type() == FileCacheStorageType::DISK) {
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
                if (fs_storage != nullptr) {
                    auto* meta_store = fs_storage->get_meta_store();
                    if (meta_store != nullptr) {
                        _meta_store_write_queue_size_metrics->set_value(
                                meta_store->get_write_queue_size());
                    }
                }
            }

            if (_num_read_blocks->get_value() > 0) {
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
                                      (double)_num_read_blocks->get_value());
            }
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
                                         (double)_num_read_blocks_5m->get_value());
            }
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
                                         (double)_num_read_blocks_1h->get_value());
            }

            if (_no_warmup_num_hit_blocks->get_value() > 0) {
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
                                                (double)_no_warmup_num_read_blocks->get_value());
            }
            if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) {
                _no_warmup_hit_ratio_5m->set_value(
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
                        (double)_no_warmup_num_read_blocks_5m->get_value());
            }
            if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) {
                _no_warmup_hit_ratio_1h->set_value(
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
                        (double)_no_warmup_num_read_blocks_1h->get_value());
            }
        }
    }
}

void BlockFileCache::run_background_gc() {
    Thread::set_self_name("run_background_gc");
    FileCacheKey key;
    size_t batch_count = 0;
    while (!_close) {
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
        {
            std::unique_lock close_lock(_close_mtx);
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
            if (_close) {
                break;
            }
        }

        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
            int64_t duration_ns = 0;
            Status st;
            {
                SCOPED_RAW_TIMER(&duration_ns);
                st = _storage->remove(key);
            }
            *_storage_async_remove_latency_us << (duration_ns / 1000);

            if (!st.ok()) {
                LOG_WARNING("").error(st);
            }
            batch_count++;
        }
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
        batch_count = 0;
    }
}

void BlockFileCache::run_background_evict_in_advance() {
    Thread::set_self_name("run_background_evict_in_advance");
    LOG(INFO) << "Starting background evict in advance thread";
    int64_t batch = 0;
    while (!_close) {
        {
            std::unique_lock close_lock(_close_mtx);
            _close_cv.wait_for(
                    close_lock,
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
            if (_close) {
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
                break;
            }
        }
        batch = config::file_cache_evict_in_advance_batch_bytes;

        // Skip if eviction not needed or too many pending recycles
        if (!_need_evict_cache_in_advance ||
            _recycle_keys.size_approx() >=
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
            continue;
        }

        int64_t duration_ns = 0;
        {
            SCOPED_CACHE_LOCK(_mutex, this);
            SCOPED_RAW_TIMER(&duration_ns);
            try_evict_in_advance(batch, cache_lock);
        }
        *_evict_in_advance_latency_us << (duration_ns / 1000);
    }
}

void BlockFileCache::run_background_block_lru_update() {
    Thread::set_self_name("run_background_block_lru_update");
    std::vector<FileBlockSPtr> batch;
    while (!_close) {
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
        size_t batch_limit =
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
        {
            std::unique_lock close_lock(_close_mtx);
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
            if (_close) {
                break;
            }
        }

        batch.clear();
        batch.reserve(batch_limit);
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
        if (drained == 0) {
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
            continue;
        }

        int64_t duration_ns = 0;
        {
            SCOPED_CACHE_LOCK(_mutex, this);
            SCOPED_RAW_TIMER(&duration_ns);
            for (auto& block : batch) {
                update_block_lru(block, cache_lock);
            }
        }
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
    }
}

std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
                               std::chrono::steady_clock::now().time_since_epoch())
                               .count();
    SCOPED_CACHE_LOCK(_mutex, this);
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
    if (auto iter = _files.find(hash); iter != _files.end()) {
        for (auto& pair : _files.find(hash)->second) {
            const FileBlockCell* cell = &pair.second;
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
                    (cell->atime != 0 &&
                     cur_time - cell->atime <
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
                    blocks_meta.emplace_back(pair.first, cell->size(),
                                             cell->file_block->cache_type(),
                                             cell->file_block->expiration_time());
                }
            }
        }
    }
    return blocks_meta;
}

bool BlockFileCache::try_reserve_during_async_load(size_t size,
                                                   std::lock_guard<std::mutex>& cache_lock) {
    size_t removed_size = 0;
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);

    std::vector<FileBlockCell*> to_evict;
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
            if (!_disk_resource_limit_mode || removed_size >= size) {
                break;
            }
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);

            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
                         << ", offset: " << entry_offset;

            size_t cell_size = cell->size();
            DCHECK(entry_size == cell_size);

            if (cell->releasable()) {
                auto& file_block = cell->file_block;

                std::lock_guard block_lock(file_block->_mutex);
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
                to_evict.push_back(cell);
                removed_size += cell_size;
            }
        }
    };
    if (disposable_queue_size != 0) {
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
    }
    if (normal_queue_size != 0) {
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
    }
    if (index_queue_size != 0) {
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
    }
    std::string reason = "async load";
    remove_file_blocks(to_evict, cache_lock, true, reason);

    return !_disk_resource_limit_mode || removed_size >= size;
}

void BlockFileCache::clear_need_update_lru_blocks() {
    _need_update_lru_blocks.clear();
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
}

std::string BlockFileCache::clear_file_cache_directly() {
    _lru_dumper->remove_lru_dump_files();
    using namespace std::chrono;
    std::stringstream ss;
    auto start = steady_clock::now();
    SCOPED_CACHE_LOCK(_mutex, this);
    LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);

    std::string clear_msg;
    auto s = _storage->clear(clear_msg);
    if (!s.ok()) {
        return clear_msg;
    }

    int64_t num_files = _files.size();
    int64_t cache_size = _cur_cache_size;
    int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
    int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
    int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
    int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);

    int64_t clear_fd_duration = 0;
    {
        // clear FDCache to release fd
        SCOPED_RAW_TIMER(&clear_fd_duration);
        for (const auto& [file_key, file_blocks] : _files) {
            for (const auto& [offset, file_block_cell] : file_blocks) {
                AccessKeyAndOffset access_key_and_offset(file_key, offset);
                FDCache::instance()->remove_file_reader(access_key_and_offset);
            }
        }
    }

    _files.clear();
    _cur_cache_size = 0;
    _cur_ttl_size = 0;
    _time_to_key.clear();
    _key_to_time.clear();
    _index_queue.clear(cache_lock);
    _normal_queue.clear(cache_lock);
    _disposable_queue.clear(cache_lock);
    _ttl_queue.clear(cache_lock);

    clear_need_update_lru_blocks();

    ss << "finish clear_file_cache_directly"
       << " path=" << _cache_base_path
       << " time_elapsed_ms=" << duration_cast<milliseconds>(steady_clock::now() - start).count()
       << " fd_clear_time_ms=" << (clear_fd_duration / 1000000) << " num_files=" << num_files
       << " cache_size=" << cache_size << " index_queue_size=" << index_queue_size
       << " normal_queue_size=" << normal_queue_size
       << " disposible_queue_size=" << disposible_queue_size << "ttl_queue_size=" << ttl_queue_size;
    auto msg = ss.str();
    LOG(INFO) << msg;
    _lru_dumper->remove_lru_dump_files();
    return msg;
}

std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
    std::map<size_t, FileBlockSPtr> offset_to_block;
    SCOPED_CACHE_LOCK(_mutex, this);
    if (_files.contains(hash)) {
        for (auto& [offset, cell] : _files[hash]) {
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
                cell.file_block->_owned_by_cached_reader = true;
                offset_to_block.emplace(offset, cell.file_block);
            }
        }
    }
    return offset_to_block;
}

void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
    SCOPED_CACHE_LOCK(_mutex, this);
    if (auto iter = _files.find(hash); iter != _files.end()) {
        for (auto& [_, cell] : iter->second) {
            cell.update_atime();
        }
    };
}

void BlockFileCache::run_background_lru_log_replay() {
    Thread::set_self_name("run_background_lru_log_replay");
    while (!_close) {
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
        {
            std::unique_lock close_lock(_close_mtx);
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
            if (_close) {
                break;
            }
        }

        _lru_recorder->replay_queue_event(FileCacheType::TTL);
        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);

        if (config::enable_evaluate_shadow_queue_diff) {
            SCOPED_CACHE_LOCK(_mutex, this);
            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
            _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
            _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
        }
    }
}

void BlockFileCache::dump_lru_queues(bool force) {
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
        !ExecEnv::GetInstance()->get_is_upgrading()) {
        _lru_dumper->dump_queue("disposable", force);
        _lru_dumper->dump_queue("normal", force);
        _lru_dumper->dump_queue("index", force);
        _lru_dumper->dump_queue("ttl", force);
        _lru_dumper->set_first_dump_done();
    }
}

void BlockFileCache::run_background_lru_dump() {
    Thread::set_self_name("run_background_lru_dump");
    while (!_close) {
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
        {
            std::unique_lock close_lock(_close_mtx);
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
            if (_close) {
                break;
            }
        }
        dump_lru_queues(false);
    }
}

void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
    // keep this order coz may be duplicated in different queue, we use the first appearence
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
}

std::map<std::string, double> BlockFileCache::get_stats() {
    std::map<std::string, double> stats;
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();

    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
    stats["index_queue_curr_elements"] =
            (double)_cur_index_queue_element_count_metrics->get_value();

    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
    stats["ttl_queue_curr_elements"] =
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();

    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
    stats["normal_queue_curr_elements"] =
            (double)_cur_normal_queue_element_count_metrics->get_value();

    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
    stats["disposable_queue_curr_size"] =
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
    stats["disposable_queue_curr_elements"] =
            (double)_cur_disposable_queue_element_count_metrics->get_value();

    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;

    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();

    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();

    return stats;
}

// for be UTs
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
    std::map<std::string, double> stats;
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();

    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();

    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();

    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();

    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();

    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;

    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();

    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();

    return stats;
}

template void BlockFileCache::remove(FileBlockSPtr file_block,
                                     std::lock_guard<std::mutex>& cache_lock,
                                     std::lock_guard<std::mutex>& block_lock, bool sync);

#include "common/compile_check_end.h"

Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
    InconsistencyContext inconsistency_context;
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
    auto n = inconsistency_context.types.size();
    results.reserve(n);
    for (size_t i = 0; i < n; i++) {
        std::string result;
        result += "File cache info in manager:\n";
        result += inconsistency_context.infos_in_manager[i].to_string();
        result += "File cache info in storage:\n";
        result += inconsistency_context.infos_in_storage[i].to_string();
        result += inconsistency_context.types[i].to_string();
        result += "\n";
        results.push_back(std::move(result));
    }
    return Status::OK();
}

Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
    std::lock_guard<std::mutex> cache_lock(_mutex);
    std::vector<FileCacheInfo> infos_in_storage;
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
    for (const auto& info_in_storage : infos_in_storage) {
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
        if (cell == nullptr || cell->file_block == nullptr) {
            inconsistency_context.infos_in_manager.emplace_back();
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
            continue;
        }
        FileCacheInfo info_in_manager {
                .hash = info_in_storage.hash,
                .expiration_time = cell->file_block->expiration_time(),
                .size = cell->size(),
                .offset = info_in_storage.offset,
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
                .cache_type = cell->file_block->cache_type()};
        InconsistencyType inconsistent_type;
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
        }
        size_t expected_size =
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
        if (info_in_storage.size != expected_size) {
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
        }
        // Only if it is not a tmp file need we check the cache type.
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
            info_in_storage.cache_type != info_in_manager.cache_type) {
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
        }
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
        }
        if (inconsistent_type != InconsistencyType::NONE) {
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
            inconsistency_context.types.push_back(inconsistent_type);
        }
    }

    for (const auto& [hash, offset_to_cell] : _files) {
        for (const auto& [offset, cell] : offset_to_cell) {
            if (confirmed_blocks.contains({hash, offset})) {
                continue;
            }
            const auto& block = cell.file_block;
            inconsistency_context.infos_in_manager.emplace_back(
                    hash, block->expiration_time(), cell.size(), offset,
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
            inconsistency_context.infos_in_storage.emplace_back();
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
        }
    }
    return Status::OK();
}

} // namespace doris::io
