blob: d2c8b8b017139be6ab71ad26b9d917240358ff13 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "data_queue.h"
#include <glog/logging.h>
#include <algorithm>
#include <mutex>
#include <utility>
#include "pipeline/dependency.h"
#include "vec/core/block.h"
namespace doris {
namespace pipeline {
#include "common/compile_check_begin.h"
DataQueue::DataQueue(int child_count)
: _queue_blocks_lock(child_count),
_queue_blocks(child_count),
_free_blocks_lock(child_count),
_free_blocks(child_count),
_child_count(child_count),
_is_finished(child_count),
_is_canceled(child_count),
_cur_bytes_in_queue(child_count),
_cur_blocks_nums_in_queue(child_count),
_flag_queue_idx(0) {
for (int i = 0; i < child_count; ++i) {
_queue_blocks_lock[i].reset(new std::mutex());
_free_blocks_lock[i].reset(new std::mutex());
_is_finished[i] = false;
_is_canceled[i] = false;
_cur_bytes_in_queue[i] = 0;
_cur_blocks_nums_in_queue[i] = 0;
}
_un_finished_counter = child_count;
_sink_dependencies.resize(child_count, nullptr);
}
std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
{
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
if (!_free_blocks[child_idx].empty()) {
auto block = std::move(_free_blocks[child_idx].front());
_free_blocks[child_idx].pop_front();
return block;
}
}
return vectorized::Block::create_unique();
}
void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
DCHECK(block->rows() == 0);
if (!_is_low_memory_mode) {
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
_free_blocks[child_idx].emplace_back(std::move(block));
}
}
void DataQueue::clear_free_blocks() {
for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
std::deque<std::unique_ptr<vectorized::Block>> tmp_queue;
_free_blocks[child_idx].swap(tmp_queue);
}
}
//check which queue have data, and save the idx in _flag_queue_idx,
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
bool DataQueue::remaining_has_data() {
int count = _child_count;
while (--count >= 0) {
_flag_queue_idx++;
if (_flag_queue_idx == _child_count) {
_flag_queue_idx = 0;
}
if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
return true;
}
}
return false;
}
//the _flag_queue_idx indicate which queue has data, and in check can_read
//will be set idx in remaining_has_data function
Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* output_block,
int* child_idx) {
if (_is_canceled[_flag_queue_idx]) {
return Status::InternalError("Current queue of idx {} have beed canceled: ",
_flag_queue_idx);
}
{
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[_flag_queue_idx]));
if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
*output_block = std::move(_queue_blocks[_flag_queue_idx].front());
_queue_blocks[_flag_queue_idx].pop_front();
if (child_idx) {
*child_idx = _flag_queue_idx;
}
_cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
_sink_dependencies[_flag_queue_idx]->set_ready();
}
auto old_value = _cur_blocks_total_nums.fetch_sub(1);
if (old_value == 1 && _source_dependency) {
set_source_block();
}
}
}
return Status::OK();
}
void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
if (!block) {
return;
}
{
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;
if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
_sink_dependencies[child_idx]->block();
}
_cur_blocks_total_nums++;
set_source_ready();
}
}
void DataQueue::set_finish(int child_idx) {
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
if (_is_finished[child_idx]) {
return;
}
_is_finished[child_idx] = true;
if (_un_finished_counter.fetch_sub(1) == 1) {
_is_all_finished = true;
}
set_source_ready();
}
void DataQueue::set_canceled(int child_idx) {
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
DCHECK(!_is_finished[child_idx]);
_is_canceled[child_idx] = true;
_is_finished[child_idx] = true;
if (_un_finished_counter.fetch_sub(1) == 1) {
_is_all_finished = true;
}
set_source_ready();
}
bool DataQueue::is_finish(int child_idx) {
return _is_finished[child_idx];
}
bool DataQueue::is_all_finish() {
return _is_all_finished;
}
void DataQueue::set_source_ready() {
if (_source_dependency) {
std::unique_lock lc(_source_lock);
_source_dependency->set_ready();
}
}
void DataQueue::set_source_block() {
if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
std::unique_lock lc(_source_lock);
// Performing the judgment twice, attempting to avoid blocking the source as much as possible.
if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
_source_dependency->block();
}
}
}
} // namespace pipeline
} // namespace doris