blob: 47af430a17cc59ce0e55630c27c839627765ec0e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/result_block_buffer.h"
#include <gen_cpp/Data_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <limits>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include "arrow/type_fwd.h"
#include "common/config.h"
#include "pipeline/dependency.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
#include "vec/core/block.h"
#include "vec/sink/varrow_flight_result_writer.h"
#include "vec/sink/vmysql_result_writer.h"
namespace doris {
template <typename ResultCtxType>
ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state,
int buffer_size)
: _fragment_id(std::move(id)),
_is_close(false),
_batch_size(state->batch_size()),
_timezone(state->timezone()),
_be_exec_version(state->be_exec_version()),
_fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
_buffer_limit(buffer_size) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY,
fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
}
template <typename ResultCtxType>
Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status,
int64_t num_rows) {
std::unique_lock<std::mutex> l(_lock);
_returned_rows.fetch_add(num_rows);
// close will be called multiple times and error status needs to be collected.
if (!exec_status.ok()) {
_status = exec_status;
}
auto it = _result_sink_dependencies.find(id);
if (it != _result_sink_dependencies.end()) {
it->second->set_always_ready();
_result_sink_dependencies.erase(it);
} else {
_status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
print_id(id));
}
if (!_result_sink_dependencies.empty()) {
return _status;
}
_is_close = true;
_arrow_data_arrival.notify_all();
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_rpc) {
ctx->on_close(_packet_num, _returned_rows);
}
} else {
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(_status);
}
}
_waiting_rpc.clear();
}
return _status;
}
template <typename ResultCtxType>
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
std::unique_lock<std::mutex> l(_lock);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
if (_status.ok()) {
_status = reason;
}
_arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(reason);
}
_waiting_rpc.clear();
_update_dependency();
_result_batch_queue.clear();
}
template <typename ResultCtxType>
void ResultBlockBuffer<ResultCtxType>::set_dependency(
const TUniqueId& id, std::shared_ptr<pipeline::Dependency> result_sink_dependency) {
std::unique_lock<std::mutex> l(_lock);
_result_sink_dependencies[id] = result_sink_dependency;
_update_dependency();
}
template <typename ResultCtxType>
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
if (!_status.ok()) {
for (auto it : _result_sink_dependencies) {
it.second->set_ready();
}
return;
}
for (auto it : _result_sink_dependencies) {
if (_instance_rows[it.first] > _batch_size) {
it.second->block();
} else {
it.second->set_ready();
}
}
}
template <typename ResultCtxType>
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
std::lock_guard<std::mutex> l(_lock);
SCOPED_ATTACH_TASK(_mem_tracker);
Defer defer {[&]() { _update_dependency(); }};
if (!_status.ok()) {
ctx->on_failure(_status);
return _status;
}
if (!_result_batch_queue.empty()) {
auto result = _result_batch_queue.front();
_result_batch_queue.pop_front();
for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
_instance_rows_in_queue.pop_front();
RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
_packet_num++;
return Status::OK();
}
if (_is_close) {
if (!_status.ok()) {
ctx->on_failure(_status);
return Status::OK();
}
ctx->on_close(_packet_num, _returned_rows);
LOG(INFO) << fmt::format(
"ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
"packet_num={}, peak_memory_usage={}",
print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
_mem_tracker->peak_consumption());
return Status::OK();
}
// no ready data, push ctx to waiting list
_waiting_rpc.push_back(ctx);
return Status::OK();
}
template <typename ResultCtxType>
Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
std::shared_ptr<InBlockType>& result) {
std::unique_lock<std::mutex> l(_lock);
if (!_status.ok()) {
return _status;
}
if (_waiting_rpc.empty()) {
auto sz = 0;
auto num_rows = 0;
size_t batch_size = 0;
if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
num_rows = result->rows();
batch_size = result->bytes();
} else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
num_rows = result->result_batch.rows.size();
for (const auto& row : result->result_batch.rows) {
batch_size += row.size();
}
}
if (!_result_batch_queue.empty()) {
if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
sz = _result_batch_queue.back()->rows();
} else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
sz = _result_batch_queue.back()->result_batch.rows.size();
}
if (sz + num_rows < _buffer_limit &&
(batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
if constexpr (std::is_same_v<InBlockType, vectorized::Block>) {
auto last_block = _result_batch_queue.back();
for (size_t i = 0; i < last_block->columns(); i++) {
last_block->mutate_columns()[i]->insert_range_from(
*result->get_by_position(i).column, 0, num_rows);
}
} else {
std::vector<std::string>& back_rows =
_result_batch_queue.back()->result_batch.rows;
std::vector<std::string>& result_rows = result->result_batch.rows;
back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
std::make_move_iterator(result_rows.end()));
}
_last_batch_bytes += batch_size;
} else {
_instance_rows_in_queue.emplace_back();
_result_batch_queue.push_back(std::move(result));
_last_batch_bytes = batch_size;
_arrow_data_arrival
.notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,)
}
} else {
_instance_rows_in_queue.emplace_back();
_result_batch_queue.push_back(std::move(result));
_last_batch_bytes = batch_size;
_arrow_data_arrival
.notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,)
}
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
} else {
auto ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
_packet_num++;
}
_update_dependency();
return Status::OK();
}
template class ResultBlockBuffer<vectorized::GetArrowResultBatchCtx>;
template class ResultBlockBuffer<vectorized::GetResultBatchCtx>;
} // namespace doris