blob: 827c516ad75f07fcb83de6cfbc88aae0debc997a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <butil/macros.h>
#include <glog/logging.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <memory>
#include <roaring/roaring.hh>
#include <string>
#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_system.h"
#include "io/fs/path.h"
#include "olap/lru_cache.h"
#include "runtime/exec_env.h"
#include "runtime/memory/lru_cache_policy.h"
#include "runtime/memory/mem_tracker.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/core/block.h"
namespace doris {
using CacheResult = std::vector<vectorized::BlockUPtr>;
// A handle for mid-result from query lru cache.
// The handle will automatically release the cache entry when it is destroyed.
// So the caller need to make sure the handle is valid in lifecycle.
class QueryCacheHandle {
public:
QueryCacheHandle() = default;
QueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
: _cache(cache), _handle(handle) {}
~QueryCacheHandle() {
if (_handle != nullptr) {
CHECK(_cache != nullptr);
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->query_cache_mem_tracker());
_cache->release(_handle);
}
}
}
QueryCacheHandle(QueryCacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
}
QueryCacheHandle& operator=(QueryCacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
return *this;
}
std::vector<int>* get_cache_slot_orders();
CacheResult* get_cache_result();
int64_t get_cache_version();
private:
LRUCachePolicy* _cache = nullptr;
Cache::Handle* _handle = nullptr;
// Don't allow copy and assign
DISALLOW_COPY_AND_ASSIGN(QueryCacheHandle);
};
class QueryCache : public LRUCachePolicy {
public:
using LRUCachePolicy::insert;
struct CacheValue : public LRUCacheValueBase {
int64_t version;
CacheResult result;
std::vector<int> slot_orders;
CacheValue(int64_t v, CacheResult&& r, const std::vector<int>& so)
: LRUCacheValueBase(), version(v), result(std::move(r)), slot_orders(so) {}
};
// Create global instance of this class
static QueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) {
auto* res = new QueryCache(capacity, num_shards);
return res;
}
static Status build_cache_key(const std::vector<TScanRangeParams>& scan_ranges,
const TQueryCacheParam& cache_param, std::string* cache_key,
int64_t* version) {
if (scan_ranges.size() > 1) {
return Status::InternalError(
"CacheSourceOperator only support one scan range, plan error");
}
auto& scan_range = scan_ranges[0];
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
scan_range.scan_range.palo_scan_range.version.data() +
scan_range.scan_range.palo_scan_range.version.size(),
*version);
auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
if (find_tablet == cache_param.tablet_to_range.end()) {
return Status::InternalError("Not find tablet in partition_to_tablets, plan error");
}
*cache_key = cache_param.digest +
std::string(reinterpret_cast<char*>(&tablet_id), sizeof(tablet_id)) +
find_tablet->second;
return Status::OK();
}
// Return global instance.
// Client should call create_global_cache before.
static QueryCache* instance() { return ExecEnv::GetInstance()->get_query_cache(); }
QueryCache() = delete;
QueryCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::QUERY_CACHE, capacity, LRUCacheType::SIZE,
3600 * 24, num_shards) {}
bool lookup(const CacheKey& key, int64_t version, QueryCacheHandle* handle);
void insert(const CacheKey& key, int64_t version, CacheResult& result,
const std::vector<int>& solt_orders, int64_t cache_size);
};
} // namespace doris