blob: ffddfe86db51c4f07e7fc9b0f0b9aca2d703207b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <assert.h>
#include <butil/macros.h>
#include <butil/time.h>
#include <gen_cpp/Metrics_types.h>
#include <parallel_hashmap/phmap.h>
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <mutex>
#include <optional>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "butil/containers/doubly_buffered_data.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/lru_cache.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/utils.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "util/mysql_global.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "vec/core/block.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
class PTabletKeyLookupRequest;
class PTabletKeyLookupResponse;
class RuntimeState;
class TDescriptorTable;
class TExpr;
// For caching point lookup pre allocted blocks and exprs
class Reusable {
public:
~Reusable();
bool is_expired(int64_t ttl_ms) const {
return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
}
Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
const TQueryOptions& query_options, const TabletSchema& schema,
size_t block_size = 1);
std::unique_ptr<vectorized::Block> get_block();
const vectorized::DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; }
const std::unordered_map<uint32_t, uint32_t>& get_col_uid_to_idx() const {
return _col_uid_to_idx;
}
const std::vector<std::string>& get_col_default_values() const { return _col_default_values; }
// do not touch block after returned
void return_block(std::unique_ptr<vectorized::Block>& block);
TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); }
const vectorized::VExprContextSPtrs& output_exprs() { return _output_exprs_ctxs; }
int32_t rs_column_uid() const { return _row_store_column_ids; }
const std::unordered_set<int32_t> missing_col_uids() const { return _missing_col_uids; }
const std::unordered_set<int32_t> include_col_uids() const { return _include_col_uids; }
RuntimeState* runtime_state() { return _runtime_state.get(); }
// delete sign idx in block
int32_t delete_sign_idx() const { return _delete_sign_idx; }
private:
// caching TupleDescriptor, output_expr, etc...
std::unique_ptr<RuntimeState> _runtime_state;
DescriptorTbl* _desc_tbl = nullptr;
std::mutex _block_mutex;
// prevent from allocte too many tmp blocks
std::vector<std::unique_ptr<vectorized::Block>> _block_pool;
vectorized::VExprContextSPtrs _output_exprs_ctxs;
int64_t _create_timestamp = 0;
vectorized::DataTypeSerDeSPtrs _data_type_serdes;
std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx;
std::vector<std::string> _col_default_values;
// picked rowstore(column group) column unique id
int32_t _row_store_column_ids = -1;
// some column is missing in rowstore(column group), we need to fill them with column store values
std::unordered_set<int32_t> _missing_col_uids;
// included cids in rowstore(column group)
std::unordered_set<int32_t> _include_col_uids;
// delete sign idx in block
int32_t _delete_sign_idx = -1;
};
// RowCache is a LRU cache for row store
class RowCache : public LRUCachePolicy {
public:
using LRUCachePolicy::insert;
// The cache key for row lru cache
struct RowCacheKey {
RowCacheKey(int64_t tablet_id, const Slice& key) : tablet_id(tablet_id), key(key) {}
int64_t tablet_id;
Slice key;
// Encode to a flat binary which can be used as LRUCache's key
std::string encode() const {
std::string full_key;
full_key.reserve(sizeof(int64_t) + key.size);
const char* tid = reinterpret_cast<const char*>(&tablet_id);
full_key.append(tid, tid + sizeof(int64_t));
full_key.append(key.data, key.size);
return full_key;
}
};
class RowCacheValue : public LRUCacheValueBase {
public:
~RowCacheValue() override { free(cache_value); }
char* cache_value;
};
// A handle for RowCache entry. This class make it easy to handle
// Cache entry. Users don't need to release the obtained cache entry. This
// class will release the cache entry when it is destroyed.
class CacheHandle {
public:
CacheHandle() = default;
CacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
: _cache(cache), _handle(handle) {}
~CacheHandle() {
if (_handle != nullptr) {
_cache->release(_handle);
}
}
CacheHandle(CacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
}
CacheHandle& operator=(CacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
return *this;
}
bool valid() { return _cache != nullptr && _handle != nullptr; }
LRUCachePolicy* cache() const { return _cache; }
Slice data() const {
return {(char*)((RowCacheValue*)_cache->value(_handle))->cache_value,
reinterpret_cast<LRUHandle*>(_handle)->charge};
}
private:
LRUCachePolicy* _cache = nullptr;
Cache::Handle* _handle = nullptr;
// Don't allow copy and assign
DISALLOW_COPY_AND_ASSIGN(CacheHandle);
};
// Create global instance of this class
static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards);
static RowCache* instance();
// Lookup a row key from cache,
// If the Row key is found, the cache entry will be written into handle.
// CacheHandle will release cache entry to cache when it destructs
// Return true if entry is found, otherwise return false.
bool lookup(const RowCacheKey& key, CacheHandle* handle);
// Insert a row with key into this cache.
// This function is thread-safe, and when two clients insert two same key
// concurrently, this function can assure that only one page is cached.
// The in_memory page will have higher priority.
void insert(const RowCacheKey& key, const Slice& data);
//
void erase(const RowCacheKey& key);
private:
static constexpr uint32_t kDefaultNumShards = 128;
RowCache(int64_t capacity, int num_shards = kDefaultNumShards);
};
// A cache used for prepare stmt.
// One connection per stmt perf uuid
class LookupConnectionCache : public LRUCachePolicy {
public:
static LookupConnectionCache* instance() {
return ExecEnv::GetInstance()->get_lookup_connection_cache();
}
static LookupConnectionCache* create_global_instance(size_t capacity);
private:
friend class PointQueryExecutor;
LookupConnectionCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, capacity,
LRUCacheType::NUMBER,
config::tablet_lookup_cache_stale_sweep_time_sec) {}
static std::string encode_key(__int128_t cache_id) {
fmt::memory_buffer buffer;
fmt::format_to(buffer, "{}", cache_id);
return std::string(buffer.data(), buffer.size());
}
void add(__int128_t cache_id, std::shared_ptr<Reusable> item) {
std::string key = encode_key(cache_id);
auto* value = new CacheValue;
value->item = item;
VLOG_DEBUG << "Add item mem"
<< ", cache_capacity: " << get_capacity() << ", cache_usage: " << get_usage()
<< ", mem_consum: " << mem_consumption();
auto* lru_handle = insert(key, value, 1, sizeof(Reusable), CachePriority::NORMAL);
release(lru_handle);
}
std::shared_ptr<Reusable> get(__int128_t cache_id) {
std::string key = encode_key(cache_id);
auto* lru_handle = lookup(key);
if (lru_handle) {
Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
auto* value = (CacheValue*)(LRUCachePolicy::value(lru_handle));
return value->item;
}
return nullptr;
}
class CacheValue : public LRUCacheValueBase {
public:
~CacheValue() override;
std::shared_ptr<Reusable> item;
};
};
struct Metrics {
Metrics()
: init_ns(TUnit::TIME_NS),
init_key_ns(TUnit::TIME_NS),
lookup_key_ns(TUnit::TIME_NS),
lookup_data_ns(TUnit::TIME_NS),
output_data_ns(TUnit::TIME_NS),
load_segment_key_stage_ns(TUnit::TIME_NS),
load_segment_data_stage_ns(TUnit::TIME_NS) {}
RuntimeProfile::Counter init_ns;
RuntimeProfile::Counter init_key_ns;
RuntimeProfile::Counter lookup_key_ns;
RuntimeProfile::Counter lookup_data_ns;
RuntimeProfile::Counter output_data_ns;
RuntimeProfile::Counter load_segment_key_stage_ns;
RuntimeProfile::Counter load_segment_data_stage_ns;
OlapReaderStatistics read_stats;
size_t row_cache_hits = 0;
bool hit_lookup_cache = false;
size_t result_data_bytes;
};
// An util to do tablet lookup
class PointQueryExecutor {
public:
~PointQueryExecutor();
Status init(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response);
Status lookup_up();
void print_profile();
const OlapReaderStatistics& read_stats() const { return _read_stats; }
private:
Status _init_keys(const PTabletKeyLookupRequest* request);
Status _lookup_row_key();
Status _lookup_row_data();
Status _output_data();
static void release_rowset(RowsetSharedPtr* r) {
if (r && *r) {
VLOG_DEBUG << "release rowset " << (*r)->rowset_id();
(*r)->release();
}
delete r;
}
// Read context for each row
struct RowReadContext {
RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {}
std::string _primary_key;
RowCache::CacheHandle _cached_row_data;
std::optional<RowLocation> _row_location;
// rowset will be aquired during read
// and released after used
std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> _rowset_ptr;
};
PTabletKeyLookupResponse* _response = nullptr;
BaseTabletSPtr _tablet;
std::vector<RowReadContext> _row_read_ctxs;
std::shared_ptr<Reusable> _reusable;
std::unique_ptr<vectorized::Block> _result_block;
Metrics _profile_metrics;
bool _binary_row_format = false;
OlapReaderStatistics _read_stats;
int32_t _row_hits = 0;
// snapshot read version
int64_t _version = -1;
};
} // namespace doris