| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <assert.h> |
| #include <butil/macros.h> |
| #include <butil/time.h> |
| #include <gen_cpp/Metrics_types.h> |
| #include <parallel_hashmap/phmap.h> |
| #include <stdint.h> |
| #include <string.h> |
| |
| #include <algorithm> |
| #include <memory> |
| #include <mutex> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "butil/containers/doubly_buffered_data.h" |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "gutil/int128.h" |
| #include "olap/lru_cache.h" |
| #include "olap/olap_common.h" |
| #include "olap/rowset/rowset.h" |
| #include "olap/tablet.h" |
| #include "olap/utils.h" |
| #include "runtime/descriptors.h" |
| #include "util/mysql_global.h" |
| #include "util/runtime_profile.h" |
| #include "util/slice.h" |
| #include "vec/core/block.h" |
| #include "vec/data_types/serde/data_type_serde.h" |
| #include "vec/exprs/vexpr_fwd.h" |
| |
| namespace doris { |
| |
| class PTabletKeyLookupRequest; |
| class PTabletKeyLookupResponse; |
| class RuntimeState; |
| class TDescriptorTable; |
| class TExpr; |
| |
| // For caching point lookup pre allocted blocks and exprs |
| class Reusable { |
| public: |
| ~Reusable(); |
| |
| bool is_expired(int64_t ttl_ms) const { |
| return butil::gettimeofday_ms() - _create_timestamp > ttl_ms; |
| } |
| |
| Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs, |
| const TQueryOptions& query_options, const TabletSchema& schema, |
| size_t block_size = 1); |
| |
| std::unique_ptr<vectorized::Block> get_block(); |
| |
| const vectorized::DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; } |
| |
| const std::unordered_map<uint32_t, uint32_t>& get_col_uid_to_idx() const { |
| return _col_uid_to_idx; |
| } |
| |
| const std::vector<std::string>& get_col_default_values() const { return _col_default_values; } |
| |
| // do not touch block after returned |
| void return_block(std::unique_ptr<vectorized::Block>& block); |
| |
| TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); } |
| |
| const vectorized::VExprContextSPtrs& output_exprs() { return _output_exprs_ctxs; } |
| |
| int64_t mem_size() const; |
| |
| // delete sign idx in block |
| int32_t delete_sign_idx() const { return _delete_sign_idx; } |
| |
| private: |
| // caching TupleDescriptor, output_expr, etc... |
| std::unique_ptr<RuntimeState> _runtime_state; |
| DescriptorTbl* _desc_tbl; |
| std::mutex _block_mutex; |
| // prevent from allocte too many tmp blocks |
| std::vector<std::unique_ptr<vectorized::Block>> _block_pool; |
| vectorized::VExprContextSPtrs _output_exprs_ctxs; |
| int64_t _create_timestamp = 0; |
| vectorized::DataTypeSerDeSPtrs _data_type_serdes; |
| std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx; |
| std::vector<std::string> _col_default_values; |
| int64_t _mem_size = 0; |
| // delete sign idx in block |
| int32_t _delete_sign_idx = -1; |
| }; |
| |
| // RowCache is a LRU cache for row store |
| class RowCache { |
| public: |
| // The cache key for row lru cache |
| struct RowCacheKey { |
| RowCacheKey(int64_t tablet_id, const Slice& key) : tablet_id(tablet_id), key(key) {} |
| int64_t tablet_id; |
| Slice key; |
| |
| // Encode to a flat binary which can be used as LRUCache's key |
| std::string encode() const { |
| std::string full_key; |
| full_key.resize(sizeof(int64_t) + key.size); |
| int8store(&full_key.front(), tablet_id); |
| memcpy((&full_key.front()) + sizeof(tablet_id), key.data, key.size); |
| return full_key; |
| } |
| }; |
| |
| // A handle for RowCache entry. This class make it easy to handle |
| // Cache entry. Users don't need to release the obtained cache entry. This |
| // class will release the cache entry when it is destroyed. |
| class CacheHandle { |
| public: |
| CacheHandle() = default; |
| CacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {} |
| ~CacheHandle() { |
| if (_handle != nullptr) { |
| _cache->release(_handle); |
| } |
| } |
| |
| CacheHandle(CacheHandle&& other) noexcept { |
| std::swap(_cache, other._cache); |
| std::swap(_handle, other._handle); |
| } |
| |
| CacheHandle& operator=(CacheHandle&& other) noexcept { |
| std::swap(_cache, other._cache); |
| std::swap(_handle, other._handle); |
| return *this; |
| } |
| |
| bool valid() { return _cache != nullptr && _handle != nullptr; } |
| |
| Cache* cache() const { return _cache; } |
| Slice data() const { return _cache->value_slice(_handle); } |
| |
| private: |
| Cache* _cache = nullptr; |
| Cache::Handle* _handle = nullptr; |
| |
| // Don't allow copy and assign |
| DISALLOW_COPY_AND_ASSIGN(CacheHandle); |
| }; |
| |
| // Create global instance of this class |
| static void create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards); |
| |
| static RowCache* instance(); |
| |
| // Lookup a row key from cache, |
| // If the Row key is found, the cache entry will be written into handle. |
| // CacheHandle will release cache entry to cache when it destructs |
| // Return true if entry is found, otherwise return false. |
| bool lookup(const RowCacheKey& key, CacheHandle* handle); |
| |
| // Insert a row with key into this cache. |
| // This function is thread-safe, and when two clients insert two same key |
| // concurrently, this function can assure that only one page is cached. |
| // The in_memory page will have higher priority. |
| void insert(const RowCacheKey& key, const Slice& data); |
| |
| // |
| void erase(const RowCacheKey& key); |
| |
| private: |
| static constexpr uint32_t kDefaultNumShards = 128; |
| RowCache(int64_t capacity, int num_shards = kDefaultNumShards); |
| static RowCache* _s_instance; |
| std::unique_ptr<Cache> _cache = nullptr; |
| }; |
| |
| // A cache used for prepare stmt. |
| // One connection per stmt perf uuid |
| class LookupConnectionCache : public LRUCachePolicy { |
| public: |
| static LookupConnectionCache* instance() { return _s_instance; } |
| |
| static void create_global_instance(size_t capacity); |
| |
| private: |
| friend class PointQueryExecutor; |
| LookupConnectionCache(size_t capacity) |
| : LRUCachePolicy("LookupConnectionCache", capacity, LRUCacheType::SIZE, |
| config::tablet_lookup_cache_clean_interval) {} |
| |
| std::string encode_key(__int128_t cache_id) { |
| fmt::memory_buffer buffer; |
| fmt::format_to(buffer, "{}", cache_id); |
| return std::string(buffer.data(), buffer.size()); |
| } |
| |
| void add(__int128_t cache_id, std::shared_ptr<Reusable> item) { |
| std::string key = encode_key(cache_id); |
| CacheValue* value = new CacheValue; |
| value->last_visit_time = UnixMillis(); |
| value->item = item; |
| auto deleter = [](const doris::CacheKey& key, void* value) { |
| CacheValue* cache_value = (CacheValue*)value; |
| delete cache_value; |
| }; |
| LOG(INFO) << "Add item mem size " << item->mem_size() |
| << ", cache_capacity: " << _cache->get_total_capacity() |
| << ", cache_usage: " << _cache->get_usage() |
| << ", mem_consum: " << _cache->mem_consumption(); |
| auto lru_handle = |
| _cache->insert(key, value, item->mem_size(), deleter, CachePriority::NORMAL); |
| _cache->release(lru_handle); |
| } |
| |
| std::shared_ptr<Reusable> get(__int128_t cache_id) { |
| std::string key = encode_key(cache_id); |
| auto lru_handle = _cache->lookup(key); |
| if (lru_handle) { |
| Defer release([cache = _cache.get(), lru_handle] { cache->release(lru_handle); }); |
| auto value = (CacheValue*)_cache->value(lru_handle); |
| value->last_visit_time = UnixMillis(); |
| return value->item; |
| } |
| return nullptr; |
| } |
| |
| struct CacheValue : public LRUCacheValueBase { |
| std::shared_ptr<Reusable> item = nullptr; |
| }; |
| |
| static LookupConnectionCache* _s_instance; |
| }; |
| |
| struct Metrics { |
| Metrics() |
| : init_ns(TUnit::TIME_NS), |
| init_key_ns(TUnit::TIME_NS), |
| lookup_key_ns(TUnit::TIME_NS), |
| lookup_data_ns(TUnit::TIME_NS), |
| output_data_ns(TUnit::TIME_NS) {} |
| RuntimeProfile::Counter init_ns; |
| RuntimeProfile::Counter init_key_ns; |
| RuntimeProfile::Counter lookup_key_ns; |
| RuntimeProfile::Counter lookup_data_ns; |
| RuntimeProfile::Counter output_data_ns; |
| OlapReaderStatistics read_stats; |
| size_t row_cache_hits = 0; |
| bool hit_lookup_cache = false; |
| size_t result_data_bytes; |
| }; |
| |
| // An util to do tablet lookup |
| class PointQueryExecutor { |
| public: |
| Status init(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response); |
| |
| Status lookup_up(); |
| |
| std::string print_profile(); |
| |
| private: |
| Status _init_keys(const PTabletKeyLookupRequest* request); |
| |
| Status _lookup_row_key(); |
| |
| Status _lookup_row_data(); |
| |
| Status _output_data(); |
| |
| static void release_rowset(RowsetSharedPtr* r) { |
| if (r && *r) { |
| VLOG_DEBUG << "release rowset " << (*r)->unique_id(); |
| (*r)->release(); |
| } |
| delete r; |
| } |
| |
| // Read context for each row |
| struct RowReadContext { |
| RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {} |
| std::string _primary_key; |
| RowCache::CacheHandle _cached_row_data; |
| std::optional<RowLocation> _row_location; |
| // rowset will be aquired during read |
| // and released after used |
| std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> _rowset_ptr; |
| }; |
| |
| PTabletKeyLookupResponse* _response; |
| TabletSharedPtr _tablet; |
| std::vector<RowReadContext> _row_read_ctxs; |
| std::shared_ptr<Reusable> _reusable; |
| std::unique_ptr<vectorized::Block> _result_block; |
| Metrics _profile_metrics; |
| bool _binary_row_format = false; |
| }; |
| |
| } // namespace doris |