|  | // Licensed to the Apache Software Foundation (ASF) under one | 
|  | // or more contributor license agreements.  See the NOTICE file | 
|  | // distributed with this work for additional information | 
|  | // regarding copyright ownership.  The ASF licenses this file | 
|  | // to you under the Apache License, Version 2.0 (the | 
|  | // "License"); you may not use this file except in compliance | 
|  | // with the License.  You may obtain a copy of the License at | 
|  | // | 
|  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, | 
|  | // software distributed under the License is distributed on an | 
|  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | // KIND, either express or implied.  See the License for the | 
|  | // specific language governing permissions and limitations | 
|  | // under the License. | 
|  |  | 
|  | #include "service/point_query_executor.h" | 
|  |  | 
|  | #include <fmt/format.h> | 
|  | #include <gen_cpp/Descriptors_types.h> | 
|  | #include <gen_cpp/Exprs_types.h> | 
|  | #include <gen_cpp/PaloInternalService_types.h> | 
|  | #include <gen_cpp/internal_service.pb.h> | 
|  | #include <stdlib.h> | 
|  |  | 
|  | #include <unordered_map> | 
|  | #include <vector> | 
|  |  | 
|  | #include "gutil/integral_types.h" | 
|  | #include "olap/lru_cache.h" | 
|  | #include "olap/olap_tuple.h" | 
|  | #include "olap/row_cursor.h" | 
|  | #include "olap/storage_engine.h" | 
|  | #include "olap/tablet_manager.h" | 
|  | #include "olap/tablet_schema.h" | 
|  | #include "runtime/runtime_state.h" | 
|  | #include "util/key_util.h" | 
|  | #include "util/runtime_profile.h" | 
|  | #include "util/simd/bits.h" | 
|  | #include "util/thrift_util.h" | 
|  | #include "vec/columns/columns_number.h" | 
|  | #include "vec/data_types/serde/data_type_serde.h" | 
|  | #include "vec/exprs/vexpr.h" | 
|  | #include "vec/exprs/vexpr_context.h" | 
|  | #include "vec/jsonb/serialize.h" | 
|  | #include "vec/sink/vmysql_result_writer.h" | 
|  |  | 
|  | namespace doris { | 
|  |  | 
|  | Reusable::~Reusable() {} | 
|  | constexpr static int s_preallocted_blocks_num = 32; | 
|  | Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs, | 
|  | const TQueryOptions& query_options, const TabletSchema& schema, | 
|  | size_t block_size) { | 
|  | SCOPED_MEM_COUNT(&_mem_size); | 
|  | _runtime_state = RuntimeState::create_unique(); | 
|  | _runtime_state->set_query_options(query_options); | 
|  | RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl)); | 
|  | _runtime_state->set_desc_tbl(_desc_tbl); | 
|  | _block_pool.resize(block_size); | 
|  | for (int i = 0; i < _block_pool.size(); ++i) { | 
|  | _block_pool[i] = vectorized::Block::create_unique(tuple_desc()->slots(), 2); | 
|  | // Name is useless but cost space | 
|  | _block_pool[i]->clear_names(); | 
|  | } | 
|  |  | 
|  | RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs)); | 
|  | RowDescriptor row_desc(tuple_desc(), false); | 
|  | // Prepare the exprs to run. | 
|  | RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc)); | 
|  | _create_timestamp = butil::gettimeofday_ms(); | 
|  | _data_type_serdes = vectorized::create_data_type_serdes(tuple_desc()->slots()); | 
|  | _col_default_values.resize(tuple_desc()->slots().size()); | 
|  | for (int i = 0; i < tuple_desc()->slots().size(); ++i) { | 
|  | auto slot = tuple_desc()->slots()[i]; | 
|  | _col_uid_to_idx[slot->col_unique_id()] = i; | 
|  | _col_default_values[i] = slot->col_default_value(); | 
|  | } | 
|  | // get the delete sign idx in block | 
|  | _delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()].unique_id()]; | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | std::unique_ptr<vectorized::Block> Reusable::get_block() { | 
|  | std::lock_guard lock(_block_mutex); | 
|  | if (_block_pool.empty()) { | 
|  | auto block = vectorized::Block::create_unique(tuple_desc()->slots(), 2); | 
|  | // Name is useless but cost space | 
|  | block->clear_names(); | 
|  | return block; | 
|  | } | 
|  | auto block = std::move(_block_pool.back()); | 
|  | CHECK(block != nullptr); | 
|  | _block_pool.pop_back(); | 
|  | return block; | 
|  | } | 
|  |  | 
|  | void Reusable::return_block(std::unique_ptr<vectorized::Block>& block) { | 
|  | std::lock_guard lock(_block_mutex); | 
|  | if (block == nullptr) { | 
|  | return; | 
|  | } | 
|  | block->clear_column_data(); | 
|  | _block_pool.push_back(std::move(block)); | 
|  | if (_block_pool.size() > s_preallocted_blocks_num) { | 
|  | _block_pool.resize(s_preallocted_blocks_num); | 
|  | } | 
|  | } | 
|  |  | 
|  | int64_t Reusable::mem_size() const { | 
|  | return _mem_size; | 
|  | } | 
|  |  | 
|  | LookupConnectionCache* LookupConnectionCache::_s_instance = nullptr; | 
|  | void LookupConnectionCache::create_global_instance(size_t capacity) { | 
|  | DCHECK(_s_instance == nullptr); | 
|  | static LookupConnectionCache instance(capacity); | 
|  | _s_instance = &instance; | 
|  | } | 
|  |  | 
|  | RowCache* RowCache::_s_instance = nullptr; | 
|  |  | 
|  | RowCache::RowCache(int64_t capacity, int num_shards) { | 
|  | // Create Row Cache | 
|  | _cache = std::unique_ptr<Cache>( | 
|  | new_lru_cache("RowCache", capacity, LRUCacheType::SIZE, num_shards)); | 
|  | } | 
|  |  | 
|  | // Create global instance of this class | 
|  | void RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { | 
|  | DCHECK(_s_instance == nullptr); | 
|  | static RowCache instance(capacity, num_shards); | 
|  | _s_instance = &instance; | 
|  | } | 
|  |  | 
|  | RowCache* RowCache::instance() { | 
|  | return _s_instance; | 
|  | } | 
|  |  | 
|  | bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { | 
|  | const std::string& encoded_key = key.encode(); | 
|  | auto lru_handle = _cache->lookup(encoded_key); | 
|  | if (!lru_handle) { | 
|  | // cache miss | 
|  | return false; | 
|  | } | 
|  | *handle = CacheHandle(_cache.get(), lru_handle); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | void RowCache::insert(const RowCacheKey& key, const Slice& value) { | 
|  | auto deleter = [](const doris::CacheKey& key, void* value) { free(value); }; | 
|  | char* cache_value = static_cast<char*>(malloc(value.size)); | 
|  | memcpy(cache_value, value.data, value.size); | 
|  | const std::string& encoded_key = key.encode(); | 
|  | auto handle = | 
|  | _cache->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL); | 
|  | // handle will released | 
|  | auto tmp = CacheHandle {_cache.get(), handle}; | 
|  | } | 
|  |  | 
|  | void RowCache::erase(const RowCacheKey& key) { | 
|  | const std::string& encoded_key = key.encode(); | 
|  | _cache->erase(encoded_key); | 
|  | } | 
|  |  | 
|  | Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, | 
|  | PTabletKeyLookupResponse* response) { | 
|  | SCOPED_TIMER(&_profile_metrics.init_ns); | 
|  | _response = response; | 
|  | // using cache | 
|  | __int128_t uuid = | 
|  | static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); | 
|  | auto cache_handle = LookupConnectionCache::instance()->get(uuid); | 
|  | _binary_row_format = request->is_binary_row(); | 
|  | _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id()); | 
|  | if (_tablet == nullptr) { | 
|  | LOG(WARNING) << "failed to do tablet_fetch_data. tablet [" << request->tablet_id() | 
|  | << "] is not exist"; | 
|  | return Status::NotFound(fmt::format("tablet {} not exist", request->tablet_id())); | 
|  | } | 
|  | if (cache_handle != nullptr) { | 
|  | _reusable = cache_handle; | 
|  | _profile_metrics.hit_lookup_cache = true; | 
|  | } else { | 
|  | // init handle | 
|  | auto reusable_ptr = std::make_shared<Reusable>(); | 
|  | TDescriptorTable t_desc_tbl; | 
|  | TExprList t_output_exprs; | 
|  | uint32_t len = request->desc_tbl().size(); | 
|  | RETURN_IF_ERROR( | 
|  | deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()), | 
|  | &len, false, &t_desc_tbl)); | 
|  | len = request->output_expr().size(); | 
|  | RETURN_IF_ERROR(deserialize_thrift_msg( | 
|  | reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false, | 
|  | &t_output_exprs)); | 
|  | _reusable = reusable_ptr; | 
|  | TQueryOptions t_query_options; | 
|  | len = request->query_options().size(); | 
|  | if (request->has_query_options()) { | 
|  | RETURN_IF_ERROR(deserialize_thrift_msg( | 
|  | reinterpret_cast<const uint8_t*>(request->query_options().data()), &len, false, | 
|  | &t_query_options)); | 
|  | } | 
|  | if (uuid != 0) { | 
|  | // could be reused by requests after, pre allocte more blocks | 
|  | RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, | 
|  | *_tablet->tablet_schema(), | 
|  | s_preallocted_blocks_num)); | 
|  | LookupConnectionCache::instance()->add(uuid, reusable_ptr); | 
|  | } else { | 
|  | RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, | 
|  | *_tablet->tablet_schema(), 1)); | 
|  | } | 
|  | } | 
|  | RETURN_IF_ERROR(_init_keys(request)); | 
|  | _result_block = _reusable->get_block(); | 
|  | CHECK(_result_block != nullptr); | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | Status PointQueryExecutor::lookup_up() { | 
|  | RETURN_IF_ERROR(_lookup_row_key()); | 
|  | RETURN_IF_ERROR(_lookup_row_data()); | 
|  | RETURN_IF_ERROR(_output_data()); | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | std::string PointQueryExecutor::print_profile() { | 
|  | auto init_us = _profile_metrics.init_ns.value() / 1000; | 
|  | auto init_key_us = _profile_metrics.init_key_ns.value() / 1000; | 
|  | auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000; | 
|  | auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000; | 
|  | auto output_data_us = _profile_metrics.output_data_ns.value() / 1000; | 
|  | auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us; | 
|  | auto read_stats = _profile_metrics.read_stats; | 
|  | return fmt::format( | 
|  | "" | 
|  | "[lookup profile:{}us] init:{}us, init_key:{}us," | 
|  | "" | 
|  | "" | 
|  | "lookup_key:{}us, lookup_data:{}us, output_data:{}us, hit_lookup_cache:{}" | 
|  | "" | 
|  | "" | 
|  | ", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}" | 
|  | ", hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, " | 
|  | "io_latency:{}ns, " | 
|  | "uncompressed_bytes_read:{}, result_data_bytes:{}" | 
|  | "", | 
|  | total_us, init_us, init_key_us, lookup_key_us, lookup_data_us, output_data_us, | 
|  | _profile_metrics.hit_lookup_cache, _binary_row_format, _reusable->output_exprs().size(), | 
|  | _row_read_ctxs.size(), _profile_metrics.row_cache_hits, read_stats.cached_pages_num, | 
|  | read_stats.total_pages_num, read_stats.compressed_bytes_read, read_stats.io_ns, | 
|  | read_stats.uncompressed_bytes_read, _profile_metrics.result_data_bytes); | 
|  | } | 
|  |  | 
|  | Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) { | 
|  | SCOPED_TIMER(&_profile_metrics.init_key_ns); | 
|  | // 1. get primary key from conditions | 
|  | std::vector<OlapTuple> olap_tuples; | 
|  | olap_tuples.resize(request->key_tuples().size()); | 
|  | for (size_t i = 0; i < request->key_tuples().size(); ++i) { | 
|  | const KeyTuple& key_tuple = request->key_tuples(i); | 
|  | for (const std::string& key_col : key_tuple.key_column_rep()) { | 
|  | olap_tuples[i].add_value(key_col); | 
|  | } | 
|  | } | 
|  | _row_read_ctxs.resize(olap_tuples.size()); | 
|  | // get row cursor and encode keys | 
|  | for (size_t i = 0; i < olap_tuples.size(); ++i) { | 
|  | RowCursor cursor; | 
|  | RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), olap_tuples[i].values())); | 
|  | RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i])); | 
|  | encode_key_with_padding<RowCursor, true, true>(&_row_read_ctxs[i]._primary_key, cursor, | 
|  | _tablet->tablet_schema()->num_key_columns(), | 
|  | true); | 
|  | } | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | Status PointQueryExecutor::_lookup_row_key() { | 
|  | SCOPED_TIMER(&_profile_metrics.lookup_key_ns); | 
|  | // 2. lookup row location | 
|  | Status st; | 
|  | std::vector<RowsetSharedPtr> specified_rowsets; | 
|  | { | 
|  | std::shared_lock rlock(_tablet->get_header_lock()); | 
|  | specified_rowsets = _tablet->get_rowset_by_ids(nullptr); | 
|  | } | 
|  | std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); | 
|  | for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { | 
|  | RowLocation location; | 
|  | if (!config::disable_storage_row_cache) { | 
|  | RowCache::CacheHandle cache_handle; | 
|  | auto hit_cache = RowCache::instance()->lookup( | 
|  | {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle); | 
|  | if (hit_cache) { | 
|  | _row_read_ctxs[i]._cached_row_data = std::move(cache_handle); | 
|  | ++_profile_metrics.row_cache_hits; | 
|  | continue; | 
|  | } | 
|  | } | 
|  | // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr | 
|  | auto rowset_ptr = std::make_unique<RowsetSharedPtr>(); | 
|  | st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false, | 
|  | specified_rowsets, &location, INT32_MAX /*rethink?*/, | 
|  | segment_caches, rowset_ptr.get())); | 
|  | if (st.is<ErrorCode::KEY_NOT_FOUND>()) { | 
|  | continue; | 
|  | } | 
|  | RETURN_IF_ERROR(st); | 
|  | _row_read_ctxs[i]._row_location = location; | 
|  | // acquire and wrap this rowset | 
|  | (*rowset_ptr)->acquire(); | 
|  | VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->unique_id(); | 
|  | _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>( | 
|  | rowset_ptr.release(), &release_rowset); | 
|  | } | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | Status PointQueryExecutor::_lookup_row_data() { | 
|  | // 3. get values | 
|  | SCOPED_TIMER(&_profile_metrics.lookup_data_ns); | 
|  | for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { | 
|  | if (_row_read_ctxs[i]._cached_row_data.valid()) { | 
|  | vectorized::JsonbSerializeUtil::jsonb_to_block( | 
|  | _reusable->get_data_type_serdes(), | 
|  | _row_read_ctxs[i]._cached_row_data.data().data, | 
|  | _row_read_ctxs[i]._cached_row_data.data().size, _reusable->get_col_uid_to_idx(), | 
|  | *_result_block, _reusable->get_col_default_values()); | 
|  | continue; | 
|  | } | 
|  | if (!_row_read_ctxs[i]._row_location.has_value()) { | 
|  | continue; | 
|  | } | 
|  | std::string value; | 
|  | RETURN_IF_ERROR(_tablet->lookup_row_data( | 
|  | _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(), | 
|  | *(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(), | 
|  | _profile_metrics.read_stats, value, | 
|  | !config::disable_storage_row_cache /*whether write row cache*/)); | 
|  | // serilize value to block, currently only jsonb row formt | 
|  | vectorized::JsonbSerializeUtil::jsonb_to_block( | 
|  | _reusable->get_data_type_serdes(), value.data(), value.size(), | 
|  | _reusable->get_col_uid_to_idx(), *_result_block, | 
|  | _reusable->get_col_default_values()); | 
|  | } | 
|  | // filter rows by delete sign | 
|  | if (_result_block->rows() > 0 && _reusable->delete_sign_idx() != -1) { | 
|  | vectorized::ColumnPtr delete_filter_columns = | 
|  | _result_block->get_columns()[_reusable->delete_sign_idx()]; | 
|  | const auto& filter = | 
|  | assert_cast<const vectorized::ColumnInt8*>(delete_filter_columns.get())->get_data(); | 
|  | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); | 
|  | if (count == filter.size()) { | 
|  | _result_block->clear(); | 
|  | } else if (count > 0) { | 
|  | return Status::NotSupported("Not implemented since only single row at present"); | 
|  | } | 
|  | } | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | template <typename MysqlWriter> | 
|  | Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block, | 
|  | PTabletKeyLookupResponse* response) { | 
|  | block.clear_names(); | 
|  | RETURN_IF_ERROR(mysql_writer.append_block(block)); | 
|  | assert(mysql_writer.results().size() == 1); | 
|  | uint8_t* buf = nullptr; | 
|  | uint32_t len = 0; | 
|  | ThriftSerializer ser(false, 4096); | 
|  | RETURN_IF_ERROR(ser.serialize(&(mysql_writer.results()[0])->result_batch, &len, &buf)); | 
|  | response->set_row_batch(std::string((const char*)buf, len)); | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | Status PointQueryExecutor::_output_data() { | 
|  | // 4. exprs exec and serialize to mysql row batches | 
|  | SCOPED_TIMER(&_profile_metrics.output_data_ns); | 
|  | if (_result_block->rows()) { | 
|  | // TODO reuse mysql_writer | 
|  | if (_binary_row_format) { | 
|  | vectorized::VMysqlResultWriter<true> mysql_writer(nullptr, _reusable->output_exprs(), | 
|  | nullptr); | 
|  | RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); | 
|  | } else { | 
|  | vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, _reusable->output_exprs(), | 
|  | nullptr); | 
|  | RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); | 
|  | } | 
|  | VLOG_DEBUG << "dump block " << _result_block->dump_data(); | 
|  | } else { | 
|  | _response->set_empty_batch(true); | 
|  | } | 
|  | _profile_metrics.result_data_bytes = _result_block->bytes(); | 
|  | _reusable->return_block(_result_block); | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | } // namespace doris |