blob: a8f8eadd1f240edd6f4eb625154c399effaceef5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/point_query_executor.h"
#include <fmt/format.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
#include <google/protobuf/extension_set.h>
#include <stdlib.h>
#include <memory>
#include <unordered_map>
#include <vector>
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "common/cast_set.h"
#include "common/consts.h"
#include "common/status.h"
#include "olap/lru_cache.h"
#include "olap/olap_tuple.h"
#include "olap/row_cursor.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/result_block_buffer.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/key_util.h"
#include "util/runtime_profile.h"
#include "util/simd/bits.h"
#include "util/thrift_util.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/jsonb/serialize.h"
#include "vec/sink/vmysql_result_writer.h"
namespace doris {
#include "common/compile_check_begin.h"
class PointQueryResultBlockBuffer final : public vectorized::MySQLResultBlockBuffer {
public:
PointQueryResultBlockBuffer(RuntimeState* state) : vectorized::MySQLResultBlockBuffer(state) {}
~PointQueryResultBlockBuffer() override = default;
std::shared_ptr<TFetchDataResult> get_block() {
std::lock_guard<std::mutex> l(_lock);
DCHECK_EQ(_result_batch_queue.size(), 1);
auto result = std::move(_result_batch_queue.front());
_result_batch_queue.pop_front();
return result;
}
};
Reusable::~Reusable() = default;
// get missing and include column ids
// input include_cids : the output expr slots columns unique ids
// missing_cids : the output expr columns that not in row columns cids
static void get_missing_and_include_cids(const TabletSchema& schema,
const std::vector<SlotDescriptor*>& slots,
int target_rs_column_id,
std::unordered_set<int>& missing_cids,
std::unordered_set<int>& include_cids) {
missing_cids.clear();
include_cids.clear();
for (auto* slot : slots) {
missing_cids.insert(slot->col_unique_id());
}
// insert delete sign column id
missing_cids.insert(schema.columns()[schema.delete_sign_idx()]->unique_id());
if (target_rs_column_id == -1) {
// no row store columns
return;
}
const TabletColumn& target_rs_column = schema.column_by_uid(target_rs_column_id);
DCHECK(target_rs_column.is_row_store_column());
// The full column group is considered a full match, thus no missing cids
if (schema.row_columns_uids().empty()) {
missing_cids.clear();
return;
}
for (int cid : schema.row_columns_uids()) {
missing_cids.erase(cid);
include_cids.insert(cid);
}
}
constexpr static int s_preallocted_blocks_num = 32;
static void extract_slot_ref(const vectorized::VExprSPtr& expr, TupleDescriptor* tuple_desc,
std::vector<SlotDescriptor*>& slots) {
const auto& children = expr->children();
for (const auto& i : children) {
extract_slot_ref(i, tuple_desc, slots);
}
auto node_type = expr->node_type();
if (node_type == TExprNodeType::SLOT_REF) {
int column_id = static_cast<const vectorized::VSlotRef*>(expr.get())->column_id();
auto* slot_desc = tuple_desc->slots()[column_id];
slots.push_back(slot_desc);
}
}
Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
const TQueryOptions& query_options, const TabletSchema& schema,
size_t block_size) {
_runtime_state = RuntimeState::create_unique();
_runtime_state->set_query_options(query_options);
RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl));
_runtime_state->set_desc_tbl(_desc_tbl);
_block_pool.resize(block_size);
for (auto& i : _block_pool) {
i = vectorized::Block::create_unique(tuple_desc()->slots(), 2);
// Name is useless but cost space
i->clear_names();
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs));
RowDescriptor row_desc(tuple_desc());
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc));
RETURN_IF_ERROR(vectorized::VExpr::open(_output_exprs_ctxs, _runtime_state.get()));
_create_timestamp = butil::gettimeofday_ms();
_data_type_serdes = vectorized::create_data_type_serdes(tuple_desc()->slots());
_col_default_values.resize(tuple_desc()->slots().size());
bool has_delete_sign = false;
for (int i = 0; i < tuple_desc()->slots().size(); ++i) {
auto* slot = tuple_desc()->slots()[i];
if (slot->col_name() == DELETE_SIGN) {
has_delete_sign = true;
}
_col_uid_to_idx[slot->col_unique_id()] = i;
_col_default_values[i] = slot->col_default_value();
}
// Get the output slot descriptors
std::vector<SlotDescriptor*> output_slot_descs;
for (const auto& expr : _output_exprs_ctxs) {
extract_slot_ref(expr->root(), tuple_desc(), output_slot_descs);
}
// get the delete sign idx in block
if (has_delete_sign) {
_delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()];
}
if (schema.have_column(BeConsts::ROW_STORE_COL)) {
const auto& column = *DORIS_TRY(schema.column(BeConsts::ROW_STORE_COL));
_row_store_column_ids = column.unique_id();
}
get_missing_and_include_cids(schema, output_slot_descs, _row_store_column_ids,
_missing_col_uids, _include_col_uids);
return Status::OK();
}
std::unique_ptr<vectorized::Block> Reusable::get_block() {
std::lock_guard lock(_block_mutex);
if (_block_pool.empty()) {
auto block = vectorized::Block::create_unique(tuple_desc()->slots(), 2);
// Name is useless but cost space
block->clear_names();
return block;
}
auto block = std::move(_block_pool.back());
CHECK(block != nullptr);
_block_pool.pop_back();
return block;
}
void Reusable::return_block(std::unique_ptr<vectorized::Block>& block) {
std::lock_guard lock(_block_mutex);
if (block == nullptr) {
return;
}
block->clear_column_data();
_block_pool.push_back(std::move(block));
if (_block_pool.size() > s_preallocted_blocks_num) {
_block_pool.resize(s_preallocted_blocks_num);
}
}
LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capacity) {
DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr);
auto* res = new LookupConnectionCache(capacity);
return res;
}
RowCache::RowCache(int64_t capacity, int num_shards)
: LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity,
LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec,
num_shards, /*element count capacity */ 0,
/*enable prune*/ true, /*is lru-k*/ true) {}
// Create global instance of this class
RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) {
DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr);
auto* res = new RowCache(capacity, num_shards);
return res;
}
RowCache* RowCache::instance() {
return ExecEnv::GetInstance()->get_row_cache();
}
bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) {
const std::string& encoded_key = key.encode();
auto* lru_handle = LRUCachePolicy::lookup(encoded_key);
if (!lru_handle) {
// cache miss
return false;
}
*handle = CacheHandle(this, lru_handle);
return true;
}
void RowCache::insert(const RowCacheKey& key, const Slice& value) {
char* cache_value = static_cast<char*>(malloc(value.size));
memcpy(cache_value, value.data, value.size);
auto* row_cache_value = new RowCacheValue;
row_cache_value->cache_value = cache_value;
const std::string& encoded_key = key.encode();
auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size,
CachePriority::NORMAL);
// handle will released
auto tmp = CacheHandle {this, handle};
}
void RowCache::erase(const RowCacheKey& key) {
const std::string& encoded_key = key.encode();
LRUCachePolicy::erase(encoded_key);
}
LookupConnectionCache::CacheValue::~CacheValue() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->point_query_executor_mem_tracker());
item.reset();
}
PointQueryExecutor::~PointQueryExecutor() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->point_query_executor_mem_tracker());
_tablet.reset();
_reusable.reset();
_result_block.reset();
_row_read_ctxs.clear();
}
Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
PTabletKeyLookupResponse* response) {
SCOPED_TIMER(&_profile_metrics.init_ns);
_response = response;
// using cache
__int128_t uuid =
static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low();
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
auto cache_handle = LookupConnectionCache::instance()->get(uuid);
_binary_row_format = request->is_binary_row();
_tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id()));
if (cache_handle != nullptr) {
_reusable = cache_handle;
_profile_metrics.hit_lookup_cache = true;
} else {
// init handle
auto reusable_ptr = std::make_shared<Reusable>();
TDescriptorTable t_desc_tbl;
TExprList t_output_exprs;
auto len = cast_set<uint32_t>(request->desc_tbl().size());
RETURN_IF_ERROR(
deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()),
&len, false, &t_desc_tbl));
len = cast_set<uint32_t>(request->output_expr().size());
RETURN_IF_ERROR(deserialize_thrift_msg(
reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false,
&t_output_exprs));
_reusable = reusable_ptr;
TQueryOptions t_query_options;
len = cast_set<uint32_t>(request->query_options().size());
if (request->has_query_options()) {
RETURN_IF_ERROR(deserialize_thrift_msg(
reinterpret_cast<const uint8_t*>(request->query_options().data()), &len, false,
&t_query_options));
}
if (uuid != 0) {
// could be reused by requests after, pre allocte more blocks
RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
*_tablet->tablet_schema(),
s_preallocted_blocks_num));
LookupConnectionCache::instance()->add(uuid, reusable_ptr);
} else {
RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
*_tablet->tablet_schema(), 1));
}
}
if (request->has_version() && request->version() >= 0) {
_version = request->version();
}
RETURN_IF_ERROR(_init_keys(request));
_result_block = _reusable->get_block();
CHECK(_result_block != nullptr);
return Status::OK();
}
Status PointQueryExecutor::lookup_up() {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
RETURN_IF_ERROR(_lookup_row_key());
RETURN_IF_ERROR(_lookup_row_data());
RETURN_IF_ERROR(_output_data());
return Status::OK();
}
void PointQueryExecutor::print_profile() {
auto init_us = _profile_metrics.init_ns.value() / 1000;
auto init_key_us = _profile_metrics.init_key_ns.value() / 1000;
auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000;
auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000;
auto output_data_us = _profile_metrics.output_data_ns.value() / 1000;
auto load_segments_key_us = _profile_metrics.load_segment_key_stage_ns.value() / 1000;
auto load_segments_data_us = _profile_metrics.load_segment_data_stage_ns.value() / 1000;
auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us;
auto read_stats = _profile_metrics.read_stats;
const std::string stats_str = fmt::format(
"[lookup profile:{}us] init:{}us, init_key:{}us,"
" lookup_key:{}us, load_segments_key:{}us, lookup_data:{}us, load_segments_data:{}us,"
" output_data:{}us, "
"hit_lookup_cache:{}"
", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}"
", hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
"io_latency:{}ns, "
"uncompressed_bytes_read:{}, result_data_bytes:{}, row_hits:{}"
", rs_column_uid:{}, bytes_read_from_local:{}, bytes_read_from_remote:{}, "
"local_io_timer:{}, remote_io_timer:{}, local_write_timer:{}",
total_us, init_us, init_key_us, lookup_key_us, load_segments_key_us, lookup_data_us,
load_segments_data_us, output_data_us, _profile_metrics.hit_lookup_cache,
_binary_row_format, _reusable->output_exprs().size(), _row_read_ctxs.size(),
_profile_metrics.row_cache_hits, read_stats.cached_pages_num,
read_stats.total_pages_num, read_stats.compressed_bytes_read, read_stats.io_ns,
read_stats.uncompressed_bytes_read, _profile_metrics.result_data_bytes, _row_hits,
_reusable->rs_column_uid(),
_profile_metrics.read_stats.file_cache_stats.bytes_read_from_local,
_profile_metrics.read_stats.file_cache_stats.bytes_read_from_remote,
_profile_metrics.read_stats.file_cache_stats.local_io_timer,
_profile_metrics.read_stats.file_cache_stats.remote_io_timer,
_profile_metrics.read_stats.file_cache_stats.write_cache_io_timer);
constexpr static int kSlowThreholdUs = 50 * 1000; // 50ms
if (total_us > kSlowThreholdUs) {
LOG(WARNING) << "slow query, " << stats_str;
} else if (VLOG_DEBUG_IS_ON) {
VLOG_DEBUG << stats_str;
} else {
LOG_EVERY_N(INFO, 1000) << stats_str;
}
}
Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) {
SCOPED_TIMER(&_profile_metrics.init_key_ns);
// 1. get primary key from conditions
std::vector<OlapTuple> olap_tuples;
olap_tuples.resize(request->key_tuples().size());
for (int i = 0; i < request->key_tuples().size(); ++i) {
const KeyTuple& key_tuple = request->key_tuples(i);
for (const std::string& key_col : key_tuple.key_column_rep()) {
olap_tuples[i].add_value(key_col);
}
}
_row_read_ctxs.resize(olap_tuples.size());
// get row cursor and encode keys
for (size_t i = 0; i < olap_tuples.size(); ++i) {
RowCursor cursor;
RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), olap_tuples[i].values()));
RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i]));
encode_key_with_padding<RowCursor, true>(&_row_read_ctxs[i]._primary_key, cursor,
_tablet->tablet_schema()->num_key_columns(), true);
}
return Status::OK();
}
Status PointQueryExecutor::_lookup_row_key() {
SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
// 2. lookup row location
Status st;
if (_version >= 0) {
CHECK(config::is_cloud_mode()) << "Only cloud mode support snapshot read at present";
SyncOptions options;
options.query_version = _version;
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(options));
}
std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock rlock(_tablet->get_header_lock());
specified_rowsets = _tablet->get_rowset_by_ids(nullptr);
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
RowLocation location;
if (!config::disable_storage_row_cache) {
RowCache::CacheHandle cache_handle;
auto hit_cache = RowCache::instance()->lookup(
{_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle);
if (hit_cache) {
_row_read_ctxs[i]._cached_row_data = std::move(cache_handle);
++_profile_metrics.row_cache_hits;
continue;
}
}
// Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr
auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false,
specified_rowsets, &location, INT32_MAX /*rethink?*/,
segment_caches, rowset_ptr.get(), false, nullptr,
&_profile_metrics.read_stats));
if (st.is<ErrorCode::KEY_NOT_FOUND>()) {
continue;
}
RETURN_IF_ERROR(st);
_row_read_ctxs[i]._row_location = location;
// acquire and wrap this rowset
(*rowset_ptr)->acquire();
VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->rowset_id();
_row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>(
rowset_ptr.release(), &release_rowset);
_row_hits++;
}
return Status::OK();
}
Status PointQueryExecutor::_lookup_row_data() {
// 3. get values
SCOPED_TIMER(&_profile_metrics.lookup_data_ns);
for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
if (_row_read_ctxs[i]._cached_row_data.valid()) {
RETURN_IF_ERROR(vectorized::JsonbSerializeUtil::jsonb_to_block(
_reusable->get_data_type_serdes(),
_row_read_ctxs[i]._cached_row_data.data().data,
_row_read_ctxs[i]._cached_row_data.data().size, _reusable->get_col_uid_to_idx(),
*_result_block, _reusable->get_col_default_values(),
_reusable->include_col_uids()));
continue;
}
if (!_row_read_ctxs[i]._row_location.has_value()) {
continue;
}
std::string value;
// fill block by row store
if (_reusable->rs_column_uid() != -1) {
bool use_row_cache = !config::disable_storage_row_cache;
RETURN_IF_ERROR(_tablet->lookup_row_data(
_row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(),
*(_row_read_ctxs[i]._rowset_ptr), _profile_metrics.read_stats, value,
use_row_cache));
// serilize value to block, currently only jsonb row formt
RETURN_IF_ERROR(vectorized::JsonbSerializeUtil::jsonb_to_block(
_reusable->get_data_type_serdes(), value.data(), value.size(),
_reusable->get_col_uid_to_idx(), *_result_block,
_reusable->get_col_default_values(), _reusable->include_col_uids()));
}
if (!_reusable->missing_col_uids().empty()) {
if (!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) {
std::string missing_columns;
for (int cid : _reusable->missing_col_uids()) {
missing_columns += _tablet->tablet_schema()->column_by_uid(cid).name() + ",";
}
return Status::InternalError(
"Not support column store, set store_row_column=true or row_store_columns "
"in table "
"properties, missing columns: " +
missing_columns + " should be added to row store");
}
// fill missing columns by column store
RowLocation row_loc = _row_read_ctxs[i]._row_location.value();
BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(_tablet->get_rowset(row_loc.rowset_id));
SegmentCacheHandle segment_cache;
{
SCOPED_TIMER(&_profile_metrics.load_segment_data_stage_ns);
RETURN_IF_ERROR(
SegmentLoader::instance()->load_segments(rowset, &segment_cache, true));
}
// find segment
auto it = std::find_if(segment_cache.get_segments().cbegin(),
segment_cache.get_segments().cend(),
[&](const segment_v2::SegmentSharedPtr& seg) {
return seg->id() == row_loc.segment_id;
});
const auto& segment = *it;
for (int cid : _reusable->missing_col_uids()) {
int pos = _reusable->get_col_uid_to_idx().at(cid);
auto row_id = static_cast<segment_v2::rowid_t>(row_loc.row_id);
vectorized::MutableColumnPtr column =
_result_block->get_by_position(pos).column->assume_mutable();
std::unique_ptr<ColumnIterator> iter;
SlotDescriptor* slot = _reusable->tuple_desc()->slots()[pos];
StorageReadOptions storage_read_options;
storage_read_options.stats = &_read_stats;
storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY;
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot,
row_id, column,
storage_read_options, iter));
if (_tablet->tablet_schema()
->column_by_uid(slot->col_unique_id())
.has_char_type()) {
column->shrink_padding_chars();
}
}
}
}
if (_result_block->columns() > _reusable->include_col_uids().size()) {
// Padding rows for some columns that no need to output to mysql client
// eg. SELECT k1,v1,v2 FROM TABLE WHERE k1 = 1, k1 is not in output slots, tuple as bellow
// TupleDescriptor{id=1, tbl=table_with_column_group}
// SlotDescriptor{id=8, col=v1, colUniqueId=1 ...}
// SlotDescriptor{id=9, col=v2, colUniqueId=2 ...}
// thus missing in include_col_uids and missing_col_uids
for (size_t i = 0; i < _result_block->columns(); ++i) {
auto column = _result_block->get_by_position(i).column;
int padding_rows = _row_hits - cast_set<int>(column->size());
if (padding_rows > 0) {
column->assume_mutable()->insert_many_defaults(padding_rows);
}
}
}
// filter rows by delete sign
if (_row_hits > 0 && _reusable->delete_sign_idx() != -1) {
size_t filtered = 0;
size_t total = 0;
{
// clear_column_data will check reference of ColumnPtr, so we need to release
// reference before clear_column_data
vectorized::ColumnPtr delete_filter_columns =
_result_block->get_columns()[_reusable->delete_sign_idx()];
const auto& filter =
assert_cast<const vectorized::ColumnInt8*>(delete_filter_columns.get())
->get_data();
filtered = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
total = filter.size();
}
if (filtered == total) {
_result_block->clear_column_data();
} else if (filtered > 0) {
return Status::NotSupported("Not implemented since only single row at present");
}
}
return Status::OK();
}
Status serialize_block(std::shared_ptr<TFetchDataResult> res, PTabletKeyLookupResponse* response) {
uint8_t* buf = nullptr;
uint32_t len = 0;
ThriftSerializer ser(false, 4096);
RETURN_IF_ERROR(ser.serialize(&(res->result_batch), &len, &buf));
response->set_row_batch(std::string((const char*)buf, len));
return Status::OK();
}
Status PointQueryExecutor::_output_data() {
// 4. exprs exec and serialize to mysql row batches
SCOPED_TIMER(&_profile_metrics.output_data_ns);
if (_result_block->rows()) {
RuntimeState state;
auto buffer = std::make_shared<PointQueryResultBlockBuffer>(&state);
// TODO reuse mysql_writer
vectorized::VMysqlResultWriter mysql_writer(buffer, _reusable->output_exprs(), nullptr,
_binary_row_format);
RETURN_IF_ERROR(mysql_writer.init(_reusable->runtime_state()));
_result_block->clear_names();
RETURN_IF_ERROR(mysql_writer.write(_reusable->runtime_state(), *_result_block));
RETURN_IF_ERROR(serialize_block(buffer->get_block(), _response));
VLOG_DEBUG << "dump block " << _result_block->dump_data();
} else {
_response->set_empty_batch(true);
}
_profile_metrics.result_data_bytes = _result_block->bytes();
_reusable->return_block(_result_block);
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris