| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/buffer_control_block.h" |
| |
| #include <gen_cpp/Data_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/internal_service.pb.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/stubs/callback.h> |
| // IWYU pragma: no_include <bits/chrono.h> |
| #include <chrono> // IWYU pragma: keep |
| #include <limits> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "arrow/type_fwd.h" |
| #include "pipeline/dependency.h" |
| #include "runtime/thread_context.h" |
| #include "util/runtime_profile.h" |
| #include "util/thrift_util.h" |
| #include "vec/core/block.h" |
| |
| namespace doris { |
| |
| void GetResultBatchCtx::on_failure(const Status& status) { |
| DCHECK(!status.ok()) << "status is ok, errmsg=" << status; |
| status.to_protobuf(result->mutable_status()); |
| { |
| // call by result sink |
| done->Run(); |
| } |
| delete this; |
| } |
| |
| void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics) { |
| Status status; |
| status.to_protobuf(result->mutable_status()); |
| if (statistics != nullptr) { |
| statistics->to_pb(result->mutable_query_statistics()); |
| } |
| result->set_packet_seq(packet_seq); |
| result->set_eos(true); |
| { done->Run(); } |
| delete this; |
| } |
| |
| void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_result, |
| int64_t packet_seq, bool eos) { |
| Status st = Status::OK(); |
| if (t_result != nullptr) { |
| uint8_t* buf = nullptr; |
| uint32_t len = 0; |
| ThriftSerializer ser(false, 4096); |
| st = ser.serialize(&t_result->result_batch, &len, &buf); |
| if (st.ok()) { |
| result->set_row_batch(std::string((const char*)buf, len)); |
| result->set_packet_seq(packet_seq); |
| result->set_eos(eos); |
| } else { |
| LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st; |
| } |
| } else { |
| result->set_empty_batch(true); |
| result->set_packet_seq(packet_seq); |
| result->set_eos(eos); |
| } |
| |
| /// The size limit of proto buffer message is 2G |
| if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) { |
| st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong()); |
| result->clear_row_batch(); |
| result->set_empty_batch(true); |
| } |
| st.to_protobuf(result->mutable_status()); |
| { done->Run(); } |
| delete this; |
| } |
| |
| void GetArrowResultBatchCtx::on_failure(const Status& status) { |
| DCHECK(!status.ok()) << "status is ok, errmsg=" << status; |
| status.to_protobuf(result->mutable_status()); |
| delete this; |
| } |
| |
| void GetArrowResultBatchCtx::on_close(int64_t packet_seq) { |
| Status status; |
| status.to_protobuf(result->mutable_status()); |
| result->set_packet_seq(packet_seq); |
| result->set_eos(true); |
| delete this; |
| } |
| |
| void GetArrowResultBatchCtx::on_data( |
| const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, int be_exec_version, |
| segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone, |
| RuntimeProfile::Counter* serialize_batch_ns_timer, |
| RuntimeProfile::Counter* uncompressed_bytes_counter, |
| RuntimeProfile::Counter* compressed_bytes_counter) { |
| Status st = Status::OK(); |
| if (result != nullptr) { |
| size_t uncompressed_bytes = 0, compressed_bytes = 0; |
| SCOPED_TIMER(serialize_batch_ns_timer); |
| st = block->serialize(be_exec_version, result->mutable_block(), &uncompressed_bytes, |
| &compressed_bytes, fragement_transmission_compression_type, false); |
| COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes); |
| COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes); |
| if (st.ok()) { |
| result->set_packet_seq(packet_seq); |
| result->set_eos(false); |
| if (packet_seq == 0) { |
| result->set_timezone(timezone); |
| } |
| } else { |
| result->clear_block(); |
| result->set_packet_seq(packet_seq); |
| LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st; |
| } |
| } else { |
| result->set_empty_batch(true); |
| result->set_packet_seq(packet_seq); |
| result->set_eos(false); |
| } |
| |
| /// The size limit of proto buffer message is 2G |
| if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) { |
| st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong()); |
| result->clear_block(); |
| } |
| st.to_protobuf(result->mutable_status()); |
| delete this; |
| } |
| |
| BufferControlBlock::BufferControlBlock(TUniqueId id, int buffer_size, RuntimeState* state) |
| : _fragment_id(std::move(id)), |
| _is_close(false), |
| _is_cancelled(false), |
| _buffer_limit(buffer_size), |
| _packet_num(0), |
| _batch_size(state->batch_size()), |
| _timezone(state->timezone()), |
| _timezone_obj(state->timezone_obj()), |
| _be_exec_version(state->be_exec_version()), |
| _fragement_transmission_compression_type( |
| state->fragement_transmission_compression_type()), |
| _profile("BufferControlBlock " + print_id(_fragment_id)) { |
| _query_statistics = std::make_unique<QueryStatistics>(); |
| _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime"); |
| _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", TUnit::BYTES); |
| _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", TUnit::BYTES); |
| _mem_tracker = MemTrackerLimiter::create_shared( |
| MemTrackerLimiter::Type::QUERY, |
| fmt::format("BufferControlBlock#FragmentInstanceId={}", print_id(_fragment_id))); |
| } |
| |
| BufferControlBlock::~BufferControlBlock() { |
| cancel(); |
| } |
| |
| Status BufferControlBlock::init() { |
| return Status::OK(); |
| } |
| |
| Status BufferControlBlock::add_batch(RuntimeState* state, |
| std::unique_ptr<TFetchDataResult>& result) { |
| std::unique_lock<std::mutex> l(_lock); |
| |
| if (_is_cancelled) { |
| return Status::Cancelled("Cancelled"); |
| } |
| |
| int num_rows = result->result_batch.rows.size(); |
| if (_waiting_rpc.empty()) { |
| // Merge result into batch to reduce rpc times |
| if (!_fe_result_batch_queue.empty() && |
| ((_fe_result_batch_queue.back()->result_batch.rows.size() + num_rows) < |
| _buffer_limit) && |
| !result->eos) { |
| std::vector<std::string>& back_rows = _fe_result_batch_queue.back()->result_batch.rows; |
| std::vector<std::string>& result_rows = result->result_batch.rows; |
| back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), |
| std::make_move_iterator(result_rows.end())); |
| } else { |
| _instance_rows_in_queue.emplace_back(); |
| _fe_result_batch_queue.push_back(std::move(result)); |
| } |
| _instance_rows[state->fragment_instance_id()] += num_rows; |
| _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; |
| } else { |
| auto* ctx = _waiting_rpc.front(); |
| _waiting_rpc.pop_front(); |
| ctx->on_data(result, _packet_num); |
| _packet_num++; |
| } |
| |
| _update_dependency(); |
| return Status::OK(); |
| } |
| |
| Status BufferControlBlock::add_arrow_batch(RuntimeState* state, |
| std::shared_ptr<vectorized::Block>& result) { |
| std::unique_lock<std::mutex> l(_lock); |
| |
| if (_is_cancelled) { |
| return Status::Cancelled("Cancelled"); |
| } |
| |
| if (_waiting_arrow_result_batch_rpc.empty()) { |
| // TODO: Merge result into block to reduce rpc times |
| int num_rows = result->rows(); |
| _arrow_flight_result_batch_queue.push_back(std::move(result)); |
| _instance_rows_in_queue.emplace_back(); |
| _instance_rows[state->fragment_instance_id()] += num_rows; |
| _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; |
| _arrow_data_arrival |
| .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,) |
| } else { |
| auto* ctx = _waiting_arrow_result_batch_rpc.front(); |
| _waiting_arrow_result_batch_rpc.pop_front(); |
| ctx->on_data(result, _packet_num, _be_exec_version, |
| _fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer, |
| _uncompressed_bytes_counter, _compressed_bytes_counter); |
| _packet_num++; |
| } |
| |
| _update_dependency(); |
| return Status::OK(); |
| } |
| |
| void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { |
| std::lock_guard<std::mutex> l(_lock); |
| Defer defer {[&]() { _update_dependency(); }}; |
| if (!_status.ok()) { |
| ctx->on_failure(_status); |
| return; |
| } |
| if (_is_cancelled) { |
| ctx->on_failure(Status::Cancelled("Cancelled")); |
| return; |
| } |
| if (!_fe_result_batch_queue.empty()) { |
| // get result |
| std::unique_ptr<TFetchDataResult> result = std::move(_fe_result_batch_queue.front()); |
| _fe_result_batch_queue.pop_front(); |
| for (auto it : _instance_rows_in_queue.front()) { |
| _instance_rows[it.first] -= it.second; |
| } |
| _instance_rows_in_queue.pop_front(); |
| |
| ctx->on_data(result, _packet_num); |
| _packet_num++; |
| return; |
| } |
| if (_is_close) { |
| ctx->on_close(_packet_num, _query_statistics.get()); |
| return; |
| } |
| // no ready data, push ctx to waiting list |
| _waiting_rpc.push_back(ctx); |
| } |
| |
| Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* result, |
| cctz::time_zone& timezone_obj) { |
| std::unique_lock<std::mutex> l(_lock); |
| Defer defer {[&]() { _update_dependency(); }}; |
| if (!_status.ok()) { |
| return _status; |
| } |
| if (_is_cancelled) { |
| return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); |
| } |
| |
| while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) { |
| _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20)); |
| } |
| |
| if (!_status.ok()) { |
| return _status; |
| } |
| if (_is_cancelled) { |
| return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); |
| } |
| |
| if (!_arrow_flight_result_batch_queue.empty()) { |
| *result = std::move(_arrow_flight_result_batch_queue.front()); |
| _arrow_flight_result_batch_queue.pop_front(); |
| timezone_obj = _timezone_obj; |
| |
| for (auto it : _instance_rows_in_queue.front()) { |
| _instance_rows[it.first] -= it.second; |
| } |
| _instance_rows_in_queue.pop_front(); |
| _packet_num++; |
| return Status::OK(); |
| } |
| |
| // normal path end |
| if (_is_close) { |
| if (!_status.ok()) { |
| return _status; |
| } |
| std::stringstream ss; |
| _profile.pretty_print(&ss); |
| LOG(INFO) << fmt::format( |
| "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " |
| "packet_num={}, peak_memory_usage={}, profile={}", |
| print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, |
| _mem_tracker->peak_consumption(), ss.str()); |
| return Status::OK(); |
| } |
| return Status::InternalError( |
| fmt::format("Get Arrow Batch Abnormal Ending (), ()", print_id(_fragment_id), _status)); |
| } |
| |
| void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { |
| std::unique_lock<std::mutex> l(_lock); |
| SCOPED_ATTACH_TASK(_mem_tracker); |
| Defer defer {[&]() { _update_dependency(); }}; |
| if (!_status.ok()) { |
| ctx->on_failure(_status); |
| return; |
| } |
| if (_is_cancelled) { |
| ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)))); |
| return; |
| } |
| |
| if (!_arrow_flight_result_batch_queue.empty()) { |
| auto block = _arrow_flight_result_batch_queue.front(); |
| _arrow_flight_result_batch_queue.pop_front(); |
| for (auto it : _instance_rows_in_queue.front()) { |
| _instance_rows[it.first] -= it.second; |
| } |
| _instance_rows_in_queue.pop_front(); |
| |
| ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type, |
| _timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter, |
| _compressed_bytes_counter); |
| _packet_num++; |
| return; |
| } |
| |
| // normal path end |
| if (_is_close) { |
| if (!_status.ok()) { |
| ctx->on_failure(_status); |
| return; |
| } |
| ctx->on_close(_packet_num); |
| std::stringstream ss; |
| _profile.pretty_print(&ss); |
| LOG(INFO) << fmt::format( |
| "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " |
| "packet_num={}, peak_memory_usage={}, profile={}", |
| print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, |
| _mem_tracker->peak_consumption(), ss.str()); |
| return; |
| } |
| // no ready data, push ctx to waiting list |
| _waiting_arrow_result_batch_rpc.push_back(ctx); |
| } |
| |
| void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) { |
| std::lock_guard<std::mutex> l(_lock); |
| _arrow_schema = arrow_schema; |
| } |
| |
| Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) { |
| std::unique_lock<std::mutex> l(_lock); |
| if (!_status.ok()) { |
| return _status; |
| } |
| if (_is_cancelled) { |
| return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))); |
| } |
| |
| // normal path end |
| if (_arrow_schema != nullptr) { |
| *arrow_schema = _arrow_schema; |
| return Status::OK(); |
| } |
| |
| if (_is_close) { |
| return Status::RuntimeError(fmt::format("Closed ()", print_id(_fragment_id))); |
| } |
| return Status::InternalError(fmt::format("Get Arrow Schema Abnormal Ending (), ()", |
| print_id(_fragment_id), _status)); |
| } |
| |
| Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { |
| std::unique_lock<std::mutex> l(_lock); |
| // close will be called multiple times and error status needs to be collected. |
| if (!exec_status.ok()) { |
| _status = exec_status; |
| } |
| |
| auto it = _result_sink_dependencys.find(id); |
| if (it != _result_sink_dependencys.end()) { |
| it->second->set_always_ready(); |
| _result_sink_dependencys.erase(it); |
| } |
| if (!_result_sink_dependencys.empty()) { |
| return Status::OK(); |
| } |
| |
| _is_close = true; |
| _arrow_data_arrival.notify_all(); |
| |
| if (!_waiting_rpc.empty()) { |
| if (_status.ok()) { |
| for (auto& ctx : _waiting_rpc) { |
| ctx->on_close(_packet_num, _query_statistics.get()); |
| } |
| } else { |
| for (auto& ctx : _waiting_rpc) { |
| ctx->on_failure(_status); |
| } |
| } |
| _waiting_rpc.clear(); |
| } |
| |
| if (!_waiting_arrow_result_batch_rpc.empty()) { |
| if (_status.ok()) { |
| for (auto& ctx : _waiting_arrow_result_batch_rpc) { |
| ctx->on_close(_packet_num); |
| } |
| } else { |
| for (auto& ctx : _waiting_arrow_result_batch_rpc) { |
| ctx->on_failure(_status); |
| } |
| } |
| _waiting_arrow_result_batch_rpc.clear(); |
| } |
| return Status::OK(); |
| } |
| |
| void BufferControlBlock::cancel() { |
| std::unique_lock<std::mutex> l(_lock); |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); |
| _is_cancelled = true; |
| _arrow_data_arrival.notify_all(); |
| for (auto& ctx : _waiting_rpc) { |
| ctx->on_failure(Status::Cancelled("Cancelled")); |
| } |
| _waiting_rpc.clear(); |
| for (auto& ctx : _waiting_arrow_result_batch_rpc) { |
| ctx->on_failure(Status::Cancelled("Cancelled")); |
| } |
| _waiting_arrow_result_batch_rpc.clear(); |
| _update_dependency(); |
| _arrow_flight_result_batch_queue.clear(); |
| } |
| |
| void BufferControlBlock::set_dependency( |
| const TUniqueId& id, std::shared_ptr<pipeline::Dependency> result_sink_dependency) { |
| std::unique_lock<std::mutex> l(_lock); |
| _result_sink_dependencys[id] = result_sink_dependency; |
| _update_dependency(); |
| } |
| |
| void BufferControlBlock::_update_dependency() { |
| if (_is_cancelled) { |
| for (auto it : _result_sink_dependencys) { |
| it.second->set_ready(); |
| } |
| return; |
| } |
| |
| for (auto it : _result_sink_dependencys) { |
| if (_instance_rows[it.first] > _batch_size) { |
| it.second->block(); |
| } else { |
| it.second->set_ready(); |
| } |
| } |
| } |
| |
| } // namespace doris |