| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "service/point_query_executor.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/Exprs_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/internal_service.pb.h> |
| #include <stdlib.h> |
| |
| #include <unordered_map> |
| #include <vector> |
| |
| #include "gutil/integral_types.h" |
| #include "olap/lru_cache.h" |
| #include "olap/olap_tuple.h" |
| #include "olap/row_cursor.h" |
| #include "olap/storage_engine.h" |
| #include "olap/tablet_manager.h" |
| #include "olap/tablet_schema.h" |
| #include "runtime/runtime_state.h" |
| #include "util/key_util.h" |
| #include "util/runtime_profile.h" |
| #include "util/simd/bits.h" |
| #include "util/thrift_util.h" |
| #include "vec/columns/columns_number.h" |
| #include "vec/data_types/serde/data_type_serde.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| #include "vec/jsonb/serialize.h" |
| #include "vec/sink/vmysql_result_writer.h" |
| |
| namespace doris { |
| |
| Reusable::~Reusable() {} |
| constexpr static int s_preallocted_blocks_num = 32; |
| Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs, |
| const TQueryOptions& query_options, const TabletSchema& schema, |
| size_t block_size) { |
| SCOPED_MEM_COUNT(&_mem_size); |
| _runtime_state = RuntimeState::create_unique(); |
| _runtime_state->set_query_options(query_options); |
| RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl)); |
| _runtime_state->set_desc_tbl(_desc_tbl); |
| _block_pool.resize(block_size); |
| for (int i = 0; i < _block_pool.size(); ++i) { |
| _block_pool[i] = vectorized::Block::create_unique(tuple_desc()->slots(), 2); |
| // Name is useless but cost space |
| _block_pool[i]->clear_names(); |
| } |
| |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs)); |
| RowDescriptor row_desc(tuple_desc(), false); |
| // Prepare the exprs to run. |
| RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc)); |
| _create_timestamp = butil::gettimeofday_ms(); |
| _data_type_serdes = vectorized::create_data_type_serdes(tuple_desc()->slots()); |
| _col_default_values.resize(tuple_desc()->slots().size()); |
| for (int i = 0; i < tuple_desc()->slots().size(); ++i) { |
| auto slot = tuple_desc()->slots()[i]; |
| _col_uid_to_idx[slot->col_unique_id()] = i; |
| _col_default_values[i] = slot->col_default_value(); |
| } |
| // get the delete sign idx in block |
| _delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()].unique_id()]; |
| return Status::OK(); |
| } |
| |
| std::unique_ptr<vectorized::Block> Reusable::get_block() { |
| std::lock_guard lock(_block_mutex); |
| if (_block_pool.empty()) { |
| auto block = vectorized::Block::create_unique(tuple_desc()->slots(), 2); |
| // Name is useless but cost space |
| block->clear_names(); |
| return block; |
| } |
| auto block = std::move(_block_pool.back()); |
| CHECK(block != nullptr); |
| _block_pool.pop_back(); |
| return block; |
| } |
| |
| void Reusable::return_block(std::unique_ptr<vectorized::Block>& block) { |
| std::lock_guard lock(_block_mutex); |
| if (block == nullptr) { |
| return; |
| } |
| block->clear_column_data(); |
| _block_pool.push_back(std::move(block)); |
| if (_block_pool.size() > s_preallocted_blocks_num) { |
| _block_pool.resize(s_preallocted_blocks_num); |
| } |
| } |
| |
| int64_t Reusable::mem_size() const { |
| return _mem_size; |
| } |
| |
| LookupConnectionCache* LookupConnectionCache::_s_instance = nullptr; |
| void LookupConnectionCache::create_global_instance(size_t capacity) { |
| DCHECK(_s_instance == nullptr); |
| static LookupConnectionCache instance(capacity); |
| _s_instance = &instance; |
| } |
| |
| RowCache* RowCache::_s_instance = nullptr; |
| |
| RowCache::RowCache(int64_t capacity, int num_shards) { |
| // Create Row Cache |
| _cache = std::unique_ptr<Cache>( |
| new_lru_cache("RowCache", capacity, LRUCacheType::SIZE, num_shards)); |
| } |
| |
| // Create global instance of this class |
| void RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { |
| DCHECK(_s_instance == nullptr); |
| static RowCache instance(capacity, num_shards); |
| _s_instance = &instance; |
| } |
| |
| RowCache* RowCache::instance() { |
| return _s_instance; |
| } |
| |
| bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { |
| const std::string& encoded_key = key.encode(); |
| auto lru_handle = _cache->lookup(encoded_key); |
| if (!lru_handle) { |
| // cache miss |
| return false; |
| } |
| *handle = CacheHandle(_cache.get(), lru_handle); |
| return true; |
| } |
| |
| void RowCache::insert(const RowCacheKey& key, const Slice& value) { |
| auto deleter = [](const doris::CacheKey& key, void* value) { free(value); }; |
| char* cache_value = static_cast<char*>(malloc(value.size)); |
| memcpy(cache_value, value.data, value.size); |
| const std::string& encoded_key = key.encode(); |
| auto handle = |
| _cache->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL); |
| // handle will released |
| auto tmp = CacheHandle {_cache.get(), handle}; |
| } |
| |
| void RowCache::erase(const RowCacheKey& key) { |
| const std::string& encoded_key = key.encode(); |
| _cache->erase(encoded_key); |
| } |
| |
| Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, |
| PTabletKeyLookupResponse* response) { |
| SCOPED_TIMER(&_profile_metrics.init_ns); |
| _response = response; |
| // using cache |
| __int128_t uuid = |
| static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); |
| auto cache_handle = LookupConnectionCache::instance()->get(uuid); |
| _binary_row_format = request->is_binary_row(); |
| _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id()); |
| if (_tablet == nullptr) { |
| LOG(WARNING) << "failed to do tablet_fetch_data. tablet [" << request->tablet_id() |
| << "] is not exist"; |
| return Status::NotFound(fmt::format("tablet {} not exist", request->tablet_id())); |
| } |
| if (cache_handle != nullptr) { |
| _reusable = cache_handle; |
| _profile_metrics.hit_lookup_cache = true; |
| } else { |
| // init handle |
| auto reusable_ptr = std::make_shared<Reusable>(); |
| TDescriptorTable t_desc_tbl; |
| TExprList t_output_exprs; |
| uint32_t len = request->desc_tbl().size(); |
| RETURN_IF_ERROR( |
| deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()), |
| &len, false, &t_desc_tbl)); |
| len = request->output_expr().size(); |
| RETURN_IF_ERROR(deserialize_thrift_msg( |
| reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false, |
| &t_output_exprs)); |
| _reusable = reusable_ptr; |
| TQueryOptions t_query_options; |
| len = request->query_options().size(); |
| if (request->has_query_options()) { |
| RETURN_IF_ERROR(deserialize_thrift_msg( |
| reinterpret_cast<const uint8_t*>(request->query_options().data()), &len, false, |
| &t_query_options)); |
| } |
| if (uuid != 0) { |
| // could be reused by requests after, pre allocte more blocks |
| RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, |
| *_tablet->tablet_schema(), |
| s_preallocted_blocks_num)); |
| LookupConnectionCache::instance()->add(uuid, reusable_ptr); |
| } else { |
| RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, |
| *_tablet->tablet_schema(), 1)); |
| } |
| } |
| RETURN_IF_ERROR(_init_keys(request)); |
| _result_block = _reusable->get_block(); |
| CHECK(_result_block != nullptr); |
| return Status::OK(); |
| } |
| |
| Status PointQueryExecutor::lookup_up() { |
| RETURN_IF_ERROR(_lookup_row_key()); |
| RETURN_IF_ERROR(_lookup_row_data()); |
| RETURN_IF_ERROR(_output_data()); |
| return Status::OK(); |
| } |
| |
| std::string PointQueryExecutor::print_profile() { |
| auto init_us = _profile_metrics.init_ns.value() / 1000; |
| auto init_key_us = _profile_metrics.init_key_ns.value() / 1000; |
| auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000; |
| auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000; |
| auto output_data_us = _profile_metrics.output_data_ns.value() / 1000; |
| auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us; |
| auto read_stats = _profile_metrics.read_stats; |
| return fmt::format( |
| "" |
| "[lookup profile:{}us] init:{}us, init_key:{}us," |
| "" |
| "" |
| "lookup_key:{}us, lookup_data:{}us, output_data:{}us, hit_lookup_cache:{}" |
| "" |
| "" |
| ", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}" |
| ", hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, " |
| "io_latency:{}ns, " |
| "uncompressed_bytes_read:{}, result_data_bytes:{}" |
| "", |
| total_us, init_us, init_key_us, lookup_key_us, lookup_data_us, output_data_us, |
| _profile_metrics.hit_lookup_cache, _binary_row_format, _reusable->output_exprs().size(), |
| _row_read_ctxs.size(), _profile_metrics.row_cache_hits, read_stats.cached_pages_num, |
| read_stats.total_pages_num, read_stats.compressed_bytes_read, read_stats.io_ns, |
| read_stats.uncompressed_bytes_read, _profile_metrics.result_data_bytes); |
| } |
| |
| Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) { |
| SCOPED_TIMER(&_profile_metrics.init_key_ns); |
| // 1. get primary key from conditions |
| std::vector<OlapTuple> olap_tuples; |
| olap_tuples.resize(request->key_tuples().size()); |
| for (size_t i = 0; i < request->key_tuples().size(); ++i) { |
| const KeyTuple& key_tuple = request->key_tuples(i); |
| for (const std::string& key_col : key_tuple.key_column_rep()) { |
| olap_tuples[i].add_value(key_col); |
| } |
| } |
| _row_read_ctxs.resize(olap_tuples.size()); |
| // get row cursor and encode keys |
| for (size_t i = 0; i < olap_tuples.size(); ++i) { |
| RowCursor cursor; |
| RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), olap_tuples[i].values())); |
| RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i])); |
| encode_key_with_padding<RowCursor, true, true>(&_row_read_ctxs[i]._primary_key, cursor, |
| _tablet->tablet_schema()->num_key_columns(), |
| true); |
| } |
| return Status::OK(); |
| } |
| |
| Status PointQueryExecutor::_lookup_row_key() { |
| SCOPED_TIMER(&_profile_metrics.lookup_key_ns); |
| // 2. lookup row location |
| Status st; |
| std::vector<RowsetSharedPtr> specified_rowsets; |
| { |
| std::shared_lock rlock(_tablet->get_header_lock()); |
| specified_rowsets = _tablet->get_rowset_by_ids(nullptr); |
| } |
| std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); |
| for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { |
| RowLocation location; |
| if (!config::disable_storage_row_cache) { |
| RowCache::CacheHandle cache_handle; |
| auto hit_cache = RowCache::instance()->lookup( |
| {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle); |
| if (hit_cache) { |
| _row_read_ctxs[i]._cached_row_data = std::move(cache_handle); |
| ++_profile_metrics.row_cache_hits; |
| continue; |
| } |
| } |
| // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr |
| auto rowset_ptr = std::make_unique<RowsetSharedPtr>(); |
| st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false, |
| specified_rowsets, &location, INT32_MAX /*rethink?*/, |
| segment_caches, rowset_ptr.get())); |
| if (st.is<ErrorCode::KEY_NOT_FOUND>()) { |
| continue; |
| } |
| RETURN_IF_ERROR(st); |
| _row_read_ctxs[i]._row_location = location; |
| // acquire and wrap this rowset |
| (*rowset_ptr)->acquire(); |
| VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->unique_id(); |
| _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>( |
| rowset_ptr.release(), &release_rowset); |
| } |
| return Status::OK(); |
| } |
| |
| Status PointQueryExecutor::_lookup_row_data() { |
| // 3. get values |
| SCOPED_TIMER(&_profile_metrics.lookup_data_ns); |
| for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { |
| if (_row_read_ctxs[i]._cached_row_data.valid()) { |
| vectorized::JsonbSerializeUtil::jsonb_to_block( |
| _reusable->get_data_type_serdes(), |
| _row_read_ctxs[i]._cached_row_data.data().data, |
| _row_read_ctxs[i]._cached_row_data.data().size, _reusable->get_col_uid_to_idx(), |
| *_result_block, _reusable->get_col_default_values()); |
| continue; |
| } |
| if (!_row_read_ctxs[i]._row_location.has_value()) { |
| continue; |
| } |
| std::string value; |
| RETURN_IF_ERROR(_tablet->lookup_row_data( |
| _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(), |
| *(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(), |
| _profile_metrics.read_stats, value, |
| !config::disable_storage_row_cache /*whether write row cache*/)); |
| // serilize value to block, currently only jsonb row formt |
| vectorized::JsonbSerializeUtil::jsonb_to_block( |
| _reusable->get_data_type_serdes(), value.data(), value.size(), |
| _reusable->get_col_uid_to_idx(), *_result_block, |
| _reusable->get_col_default_values()); |
| } |
| // filter rows by delete sign |
| if (_result_block->rows() > 0 && _reusable->delete_sign_idx() != -1) { |
| vectorized::ColumnPtr delete_filter_columns = |
| _result_block->get_columns()[_reusable->delete_sign_idx()]; |
| const auto& filter = |
| assert_cast<const vectorized::ColumnInt8*>(delete_filter_columns.get())->get_data(); |
| size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
| if (count == filter.size()) { |
| _result_block->clear(); |
| } else if (count > 0) { |
| return Status::NotSupported("Not implemented since only single row at present"); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| template <typename MysqlWriter> |
| Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block, |
| PTabletKeyLookupResponse* response) { |
| block.clear_names(); |
| RETURN_IF_ERROR(mysql_writer.append_block(block)); |
| assert(mysql_writer.results().size() == 1); |
| uint8_t* buf = nullptr; |
| uint32_t len = 0; |
| ThriftSerializer ser(false, 4096); |
| RETURN_IF_ERROR(ser.serialize(&(mysql_writer.results()[0])->result_batch, &len, &buf)); |
| response->set_row_batch(std::string((const char*)buf, len)); |
| return Status::OK(); |
| } |
| |
| Status PointQueryExecutor::_output_data() { |
| // 4. exprs exec and serialize to mysql row batches |
| SCOPED_TIMER(&_profile_metrics.output_data_ns); |
| if (_result_block->rows()) { |
| // TODO reuse mysql_writer |
| if (_binary_row_format) { |
| vectorized::VMysqlResultWriter<true> mysql_writer(nullptr, _reusable->output_exprs(), |
| nullptr); |
| RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); |
| } else { |
| vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, _reusable->output_exprs(), |
| nullptr); |
| RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); |
| } |
| VLOG_DEBUG << "dump block " << _result_block->dump_data(); |
| } else { |
| _response->set_empty_batch(true); |
| } |
| _profile_metrics.result_data_bytes = _result_block->bytes(); |
| _reusable->return_block(_result_block); |
| return Status::OK(); |
| } |
| |
| } // namespace doris |