blob: 752e972c3e642fa9311defd6a30d50becda3709b [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/operator/data_queue.h"
#include <glog/logging.h>
#include <algorithm>
#include <utility>
#include "common/thread_safety_annotations.h"
#include "core/block/block.h"
#include "exec/pipeline/dependency.h"
namespace doris {
void SubQueue::try_pop(std::unique_ptr<Block>* output_block) {
LockGuard l(queue_lock);
if (!blocks.empty()) {
*output_block = std::move(blocks.front());
blocks.pop_front();
bytes_in_queue -= (*output_block)->allocated_bytes();
blocks_in_queue -= 1;
if (blocks.empty()) {
sink_dependency->set_ready();
}
}
}
bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& total_counter) {
LockGuard l(queue_lock);
if (is_finished) {
return false;
}
total_counter++;
bytes_in_queue += block->allocated_bytes();
blocks.emplace_back(std::move(block));
blocks_in_queue += 1;
if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) {
sink_dependency->block();
}
return true;
}
bool SubQueue::mark_finished(std::atomic_uint32_t& unfinished_counter,
std::atomic_bool& all_finished) {
LockGuard l(queue_lock);
if (is_finished) {
return false;
}
is_finished = true;
if (unfinished_counter.fetch_sub(1) == 1) {
all_finished = true;
}
return true;
}
void SubQueue::clear_blocks() {
bool need_set_always_ready = false;
{
LockGuard l(queue_lock);
if (!blocks.empty()) {
blocks.clear();
bytes_in_queue = 0;
blocks_in_queue = 0;
need_set_always_ready = true;
}
}
// Notify outside of queue_lock to keep lock ordering simple.
if (need_set_always_ready) {
sink_dependency->set_always_ready();
}
}
DataQueue::DataQueue(int child_count) : _sub_queues(child_count), _child_count(child_count) {
for (auto& sub : _sub_queues) {
sub = std::make_unique<SubQueue>();
}
_un_finished_counter = child_count;
}
bool DataQueue::has_more_data() const {
return _cur_blocks_total_nums.load() > 0;
}
void DataQueue::set_source_dependency(std::shared_ptr<Dependency> source_dependency)
NO_THREAD_SAFETY_ANALYSIS {
_source_dependency = std::move(source_dependency);
}
void DataQueue::set_sink_dependency(Dependency* sink_dependency, int child_idx) {
_sub_queues[child_idx]->sink_dependency = sink_dependency;
}
void DataQueue::set_max_blocks_in_sub_queue(int64_t max_blocks) {
for (auto& sub : _sub_queues) {
sub->max_blocks_in_queue = max_blocks;
}
}
void DataQueue::set_low_memory_mode() {
_is_low_memory_mode = true;
for (auto& sub : _sub_queues) {
sub->max_blocks_in_queue = 1;
}
clear_free_blocks();
}
std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) {
auto& sub = *_sub_queues[child_idx];
{
LockGuard l(sub.free_lock);
if (!sub.free_blocks.empty()) {
auto block = std::move(sub.free_blocks.front());
sub.free_blocks.pop_front();
return block;
}
}
return Block::create_unique();
}
void DataQueue::push_free_block(std::unique_ptr<Block> block, int child_idx) {
DCHECK(block->rows() == 0);
if (!_is_low_memory_mode) {
auto& sub = *_sub_queues[child_idx];
LockGuard l(sub.free_lock);
sub.free_blocks.emplace_back(std::move(block));
}
}
void DataQueue::clear_free_blocks() {
for (auto& sub : _sub_queues) {
LockGuard l(sub->free_lock);
std::deque<std::unique_ptr<Block>> tmp_queue;
sub->free_blocks.swap(tmp_queue);
}
}
void DataQueue::terminate() {
for (int i = 0; i < _child_count; ++i) {
set_finish(i);
_sub_queues[i]->clear_blocks();
}
clear_free_blocks();
}
//check which queue have data, and save the idx in _flag_queue_idx,
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
bool DataQueue::remaining_has_data() {
int count = _child_count;
while (--count >= 0) {
_flag_queue_idx++;
if (_flag_queue_idx == _child_count) {
_flag_queue_idx = 0;
}
if (_sub_queues[_flag_queue_idx]->blocks_in_queue.load() > 0) {
return true;
}
}
return false;
}
//the _flag_queue_idx indicate which queue has data, and in check can_read
//will be set idx in remaining_has_data function
Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block, int* child_idx) {
const int idx = _flag_queue_idx;
auto& sub = *_sub_queues[idx];
sub.try_pop(output_block);
if (*output_block) {
if (child_idx) {
*child_idx = idx;
}
auto old_total = _cur_blocks_total_nums.fetch_sub(1);
if (old_total == 1) {
set_source_block();
}
}
return Status::OK();
}
Status DataQueue::push_block(std::unique_ptr<Block> block, int child_idx) {
if (!block) {
return Status::OK();
}
auto& sub = *_sub_queues[child_idx];
// total_counter is incremented inside try_push under queue_lock, only when the
// block is actually enqueued. This ensures get_block_from_queue() always observes
// _cur_blocks_total_nums >= 1 when it successfully pops a block, with no risk of
// underflow or the need for a rollback on failure.
if (!sub.try_push(std::move(block), _cur_blocks_total_nums)) {
return Status::EndOfFile("SubQueue already finished");
}
set_source_ready();
return Status::OK();
}
void DataQueue::set_finish(int child_idx) {
auto& sub = *_sub_queues[child_idx];
if (!sub.mark_finished(_un_finished_counter, _is_all_finished)) {
return;
}
set_source_ready();
}
bool DataQueue::is_all_finish() {
return _is_all_finished;
}
void DataQueue::set_source_ready() {
LockGuard lc(_source_lock);
if (_source_dependency) {
_source_dependency->set_ready();
}
}
void DataQueue::set_source_block() {
// Re-check under _source_lock to avoid blocking the source when a concurrent push
// has already added new blocks (or all children have finished) since we observed
// the counter drop to zero.
LockGuard lc(_source_lock);
if (_source_dependency && _cur_blocks_total_nums == 0 && !is_all_finish()) {
_source_dependency->block();
}
}
} // namespace doris