blob: f8ba26bc664d012ef94ff8272302128876af2142 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/operator/data_queue.h"
#include <gtest/gtest.h>
#include <memory>
#include <thread>
#include <vector>
#include "core/data_type/data_type_number.h"
#include "exec/pipeline/dependency.h"
namespace doris {
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
static std::unique_ptr<Block> make_block(size_t rows = 1) {
auto block = std::make_unique<Block>();
auto col = ColumnUInt8::create(rows);
block->insert(ColumnWithTypeAndName {std::move(col), std::make_shared<DataTypeUInt8>(), ""});
return block;
}
// Create a Dependency that starts ready.
static std::shared_ptr<Dependency> make_dep(bool initially_ready = true) {
return Dependency::create_shared(1, 1, "Test", initially_ready);
}
// ---------------------------------------------------------------------------
// SubQueue tests
// ---------------------------------------------------------------------------
class SubQueueTest : public testing::Test {
public:
void SetUp() override {
dep = make_dep(/*initially_ready=*/true);
sub = std::make_unique<SubQueue>();
sub->sink_dependency = dep.get();
sub->max_blocks_in_queue = 2;
}
std::shared_ptr<Dependency> dep;
std::unique_ptr<SubQueue> sub;
std::atomic_uint32_t counter_ {0};
};
// Pop from an empty queue returns OK with null output.
TEST_F(SubQueueTest, TryPopEmpty) {
std::unique_ptr<Block> out;
sub->try_pop(&out);
EXPECT_EQ(out, nullptr);
EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
}
// Basic push then pop returns the block.
TEST_F(SubQueueTest, TryPushPop_Basic) {
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_EQ(sub->blocks_in_queue.load(), 1u);
std::unique_ptr<Block> out;
sub->try_pop(&out);
EXPECT_NE(out, nullptr);
EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
}
// push after mark_finished returns EndOfFile.
TEST_F(SubQueueTest, TryPushAfterFinished) {
std::atomic_uint32_t counter {1};
std::atomic_bool all_done {false};
sub->mark_finished(counter, all_done);
EXPECT_FALSE(sub->try_push(make_block(), counter_));
}
// When blocks.size() exceeds max_blocks_in_queue, sink is blocked.
TEST_F(SubQueueTest, SinkBlockedWhenFull) {
sub->max_blocks_in_queue = 2;
dep->set_ready(); // start ready
// Push up to the limit — sink should stay ready.
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_TRUE(dep->ready());
// Push one over the limit — sink should be blocked.
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_FALSE(dep->ready());
}
// Sink wakes up only when the queue becomes completely empty.
TEST_F(SubQueueTest, SinkReadyWhenQueueEmpty) {
sub->max_blocks_in_queue = 2;
// Fill to 3 (one over limit) → sink blocked.
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_FALSE(dep->ready());
// Pop 1 & 2: queue not empty yet → sink still blocked.
std::unique_ptr<Block> out;
sub->try_pop(&out);
EXPECT_NE(out, nullptr);
EXPECT_FALSE(dep->ready());
sub->try_pop(&out);
EXPECT_NE(out, nullptr);
EXPECT_FALSE(dep->ready());
// Pop 3: queue empty → set_ready().
sub->try_pop(&out);
EXPECT_NE(out, nullptr);
EXPECT_TRUE(dep->ready());
}
// mark_finished is idempotent: second call returns false and counter stays correct.
TEST_F(SubQueueTest, MarkFinishedIdempotent) {
std::atomic_uint32_t counter {2};
std::atomic_bool all_done {false};
EXPECT_TRUE(sub->mark_finished(counter, all_done));
EXPECT_EQ(counter.load(), 1u);
EXPECT_FALSE(all_done.load());
EXPECT_FALSE(sub->mark_finished(counter, all_done));
EXPECT_EQ(counter.load(), 1u); // unchanged
}
// mark_finished sets all_finished when last child finishes.
TEST_F(SubQueueTest, MarkFinishedSetsAllFinished) {
std::atomic_uint32_t counter {1};
std::atomic_bool all_done {false};
sub->mark_finished(counter, all_done);
EXPECT_TRUE(all_done.load());
}
// clear_blocks empties the queue and calls set_always_ready on sink.
TEST_F(SubQueueTest, ClearBlocksEmptiesQueue) {
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_TRUE(sub->try_push(make_block(), counter_));
EXPECT_EQ(sub->blocks_in_queue.load(), 2u);
sub->clear_blocks();
EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
// set_always_ready was called → dep is always ready.
EXPECT_TRUE(dep->ready());
}
// clear_blocks on an empty queue is a no-op (set_always_ready not called).
TEST_F(SubQueueTest, ClearBlocksNoop) {
dep->block(); // start blocked
sub->clear_blocks();
EXPECT_FALSE(dep->ready()); // still blocked — clear_blocks did nothing
}
// bytes_in_queue tracks push/pop correctly.
TEST_F(SubQueueTest, BytesInQueueTracking) {
auto block = make_block(10);
int64_t expected_bytes = block->allocated_bytes();
EXPECT_TRUE(sub->try_push(std::move(block), counter_));
{
LockGuard l(sub->queue_lock);
EXPECT_EQ(sub->bytes_in_queue, expected_bytes);
}
std::unique_ptr<Block> out;
sub->try_pop(&out);
{
LockGuard l(sub->queue_lock);
EXPECT_EQ(sub->bytes_in_queue, 0);
}
}
// ---------------------------------------------------------------------------
// DataQueue fixtures
// ---------------------------------------------------------------------------
class DataQueueTest : public testing::Test {
public:
DataQueueTest() = default;
~DataQueueTest() override = default;
void SetUp() override {
data_queue = std::make_unique<DataQueue>(child_count);
for (int i = 0; i < child_count; i++) {
auto dep = Dependency::create_shared(1, 1, "DataQueueTest", true);
sink_deps.push_back(dep);
data_queue->set_sink_dependency(dep.get(), i);
}
source_dep = Dependency::create_shared(1, 1, "DataQueueTest", true);
data_queue->set_source_dependency(source_dep);
}
std::unique_ptr<DataQueue> data_queue = nullptr;
std::vector<std::shared_ptr<Dependency>> sink_deps;
std::shared_ptr<Dependency> source_dep;
const int child_count = 3;
};
// ---------------------------------------------------------------------------
// DataQueue unit tests
// ---------------------------------------------------------------------------
// Initial state: no data, no finish.
TEST_F(DataQueueTest, InitialState) {
EXPECT_FALSE(data_queue->has_more_data());
EXPECT_FALSE(data_queue->is_all_finish());
EXPECT_FALSE(data_queue->remaining_has_data());
}
// Push one block and retrieve it.
TEST_F(DataQueueTest, SinglePushPop) {
EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
EXPECT_TRUE(data_queue->has_more_data());
// Find the queue with data.
EXPECT_TRUE(data_queue->remaining_has_data());
std::unique_ptr<Block> out;
int child_idx = -1;
EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok());
EXPECT_NE(out, nullptr);
EXPECT_EQ(child_idx, 0);
EXPECT_FALSE(data_queue->has_more_data());
}
// is_all_finish only becomes true after all children call set_finish.
TEST_F(DataQueueTest, IsAllFinishAfterAllChildren) {
data_queue->set_finish(0);
EXPECT_FALSE(data_queue->is_all_finish());
data_queue->set_finish(1);
EXPECT_FALSE(data_queue->is_all_finish());
data_queue->set_finish(2);
EXPECT_TRUE(data_queue->is_all_finish());
}
// set_finish is idempotent.
TEST_F(DataQueueTest, SetFinishIdempotent) {
data_queue->set_finish(0);
data_queue->set_finish(0); // second call must not double-decrement
data_queue->set_finish(1);
data_queue->set_finish(2);
EXPECT_TRUE(data_queue->is_all_finish());
}
// child_idx returned by get_block_from_queue reflects the actual queue.
TEST_F(DataQueueTest, ChildIdxReturned) {
// Push to child 1 only.
EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok());
data_queue->remaining_has_data(); // advance _flag_queue_idx to find child 1
std::unique_ptr<Block> out;
int child_idx = -1;
EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok());
EXPECT_NE(out, nullptr);
EXPECT_EQ(child_idx, 1);
}
// get_free_block returns a new block when free list is empty, reuses when not.
TEST_F(DataQueueTest, FreeBlockReuse) {
// First call: allocates a new block.
auto block = data_queue->get_free_block(0);
EXPECT_NE(block, nullptr);
// Return it to the free list.
block->clear(); // ensure rows == 0
data_queue->push_free_block(std::move(block), 0);
// Second call: must return the recycled block.
auto block2 = data_queue->get_free_block(0);
EXPECT_NE(block2, nullptr);
}
// In low-memory mode push_free_block discards blocks and max drops to 1.
TEST_F(DataQueueTest, LowMemoryMode) {
// Pre-populate the free list.
data_queue->push_free_block(Block::create_unique(), 0);
data_queue->set_low_memory_mode();
// Free list must be cleared.
auto block = data_queue->get_free_block(0);
// The free list is empty → a fresh block is returned (not from the list).
EXPECT_NE(block, nullptr);
// push_free_block now discards.
block->clear();
data_queue->push_free_block(std::move(block), 0);
auto block2 = data_queue->get_free_block(0);
// Still gets a fresh allocation (free list stays empty).
EXPECT_NE(block2, nullptr);
}
// terminate() finishes all children and clears pending blocks from sub-queues.
TEST_F(DataQueueTest, Terminate) {
EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok());
data_queue->terminate();
EXPECT_TRUE(data_queue->is_all_finish());
// remaining_has_data() checks blocks_in_queue per sub-queue,
// which clear_blocks() resets to 0.
EXPECT_FALSE(data_queue->remaining_has_data());
}
// set_max_blocks_in_sub_queue propagates to every sub-queue.
TEST_F(DataQueueTest, SetMaxBlocksInSubQueue) {
data_queue->set_max_blocks_in_sub_queue(5);
// Push 5 blocks to child 0 — sink must stay ready (not over the limit yet).
for (int i = 0; i < 5; i++) {
EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
}
EXPECT_TRUE(sink_deps[0]->ready());
// 6th push exceeds limit → sink blocked.
EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
EXPECT_FALSE(sink_deps[0]->ready());
}
// Source dependency is notified ready when a block is pushed.
TEST_F(DataQueueTest, SourceReadyOnPush) {
source_dep->block(); // start blocked
EXPECT_FALSE(source_dep->ready());
EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
EXPECT_TRUE(source_dep->ready());
}
// ---------------------------------------------------------------------------
// Multi-threaded integration test (existing)
// ---------------------------------------------------------------------------
TEST_F(DataQueueTest, MultiTest) {
int output_count = 0;
auto output_func = [&]() {
while (true) {
bool eos = false;
if (source_dep->ready()) {
Defer set_eos {[&]() {
if (data_queue->remaining_has_data()) {
eos = false;
} else if (data_queue->is_all_finish()) {
eos = !data_queue->remaining_has_data();
} else {
eos = false;
}
}};
std::unique_ptr<Block> output_block;
int child_idx = 0;
EXPECT_TRUE(data_queue->get_block_from_queue(&output_block, &child_idx));
if (output_block) {
output_count++;
}
}
if (eos) {
break;
}
}
};
std::vector<std::unique_ptr<Block>> input_blocks[3];
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 50; j++) {
auto block = std::make_unique<Block>();
block->insert(ColumnWithTypeAndName {ColumnUInt8::create(1),
std::make_shared<DataTypeUInt8>(), ""});
input_blocks[i].push_back(std::move(block));
}
}
auto input_func = [&](int id) {
int i = 0;
while (i < 50) {
if (sink_deps[id]->ready()) {
EXPECT_TRUE(data_queue->push_block(std::move(input_blocks[id][i]), id).ok());
i++;
}
}
data_queue->set_finish(id);
};
std::thread input1(input_func, 0);
std::thread input2(input_func, 1);
std::thread input3(input_func, 2);
std::thread output1(output_func);
input1.join();
input2.join();
input3.join();
output1.join();
EXPECT_EQ(output_count, 150);
EXPECT_TRUE(data_queue->is_all_finish());
}
// ./run-be-ut.sh --run --filter=DataQueueTest.*
} // namespace doris