blob: 0bf6691d7d0bc5b7ee3121e3ba9af7ae45873d65 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/runtime/vdata_stream_recvr.h"
#include <gtest/gtest.h>
#include <memory>
#include <thread>
#include <vector>
#include "pipeline/dependency.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_runtime_state.h"
#include "vec/data_types/data_type_number.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
using namespace vectorized;
struct MockVDataStreamRecvr : public VDataStreamRecvr {
MockVDataStreamRecvr(RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* counter,
RuntimeProfile* profile, int num_senders, bool is_merging)
: VDataStreamRecvr(nullptr, counter, state, TUniqueId(), 0, num_senders, is_merging,
profile, 1) {};
bool exceeds_limit(size_t block_byte_size) override {
if (always_exceeds_limit) {
return true;
}
return VDataStreamRecvr::exceeds_limit(block_byte_size);
}
bool always_exceeds_limit = false;
};
class DataStreamRecvrTest : public testing::Test {
public:
DataStreamRecvrTest() = default;
~DataStreamRecvrTest() override = default;
void SetUp() override {}
void create_recvr(int num_senders, bool is_merging) {
_mock_counter =
std::make_unique<RuntimeProfile::HighWaterMarkCounter>(TUnit::UNIT, 0, "test");
_mock_state = std::make_unique<MockRuntimeState>();
_mock_profile = std::make_unique<RuntimeProfile>("test");
recvr = std::make_shared<MockVDataStreamRecvr>(_mock_state.get(), _mock_counter.get(),
_mock_profile.get(), num_senders,
is_merging);
}
std::shared_ptr<MockVDataStreamRecvr> recvr;
std::unique_ptr<RuntimeProfile::HighWaterMarkCounter> _mock_counter;
std::unique_ptr<MockRuntimeState> _mock_state;
std::unique_ptr<RuntimeProfile> _mock_profile;
};
TEST_F(DataStreamRecvrTest, TestCreateSenderQueue) {
{
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
}
{
create_recvr(3, true);
EXPECT_EQ(recvr->sender_queues().size(), 3);
for (auto& queue : recvr->sender_queues()) {
EXPECT_EQ(queue->_num_remaining_senders, 1);
}
}
}
TEST_F(DataStreamRecvrTest, TestSender) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
sender->set_dependency(source_dep);
EXPECT_EQ(sink_dep->ready(), true);
EXPECT_EQ(source_dep->ready(), false);
EXPECT_EQ(sender->_num_remaining_senders, 3);
EXPECT_EQ(sender->_block_queue.size(), 0);
{
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
sender->add_block(&block, false);
}
EXPECT_EQ(sink_dep->ready(), false);
{
EXPECT_EQ(sender->_block_queue.size(), 1);
Block block;
bool eos = false;
auto st = sender->get_batch(&block, &eos);
EXPECT_TRUE(st) << st.msg();
EXPECT_TRUE(ColumnHelper::block_equal(
block, ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5})));
EXPECT_FALSE(eos);
}
{
sender->decrement_senders(1);
EXPECT_EQ(sender->_num_remaining_senders, 2);
sender->decrement_senders(2);
EXPECT_EQ(sender->_num_remaining_senders, 1);
sender->decrement_senders(3);
EXPECT_EQ(sender->_num_remaining_senders, 0);
EXPECT_EQ(sender->_block_queue.size(), 0);
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
sender->add_block(&block, false);
EXPECT_EQ(sender->_block_queue.size(), 0);
}
{
EXPECT_EQ(sender->_block_queue.size(), 0);
Block block;
bool eos = false;
auto st = sender->get_batch(&block, &eos);
EXPECT_TRUE(st) << st.msg();
EXPECT_TRUE(eos);
}
sender->close();
}
TEST_F(DataStreamRecvrTest, TestSenderClose) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
sender->close();
}
TEST_F(DataStreamRecvrTest, TestRandomSender) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
sender->set_dependency(source_dep);
EXPECT_EQ(sink_dep->ready(), true);
EXPECT_EQ(source_dep->ready(), false);
auto input_func = [&](int id) {
mock_random_sleep();
int input_block = 0;
while (true) {
if (sink_dep->ready()) {
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
sender->add_block(&block, false);
input_block++;
if (input_block == 100) {
sender->decrement_senders(id);
break;
}
}
}
};
auto outout_func = [&]() {
mock_random_sleep();
int output_block = 0;
while (true) {
if (source_dep->ready()) {
Block block;
bool eos = false;
auto st = sender->get_batch(&block, &eos);
EXPECT_TRUE(st) << st.msg();
if (!block.empty()) {
output_block++;
}
if (eos) {
EXPECT_EQ(output_block, 3 * 100);
break;
}
}
}
};
std::thread input1(input_func, 1);
std::thread input2(input_func, 2);
std::thread input3(input_func, 3);
std::thread output(outout_func);
input1.join();
input2.join();
input3.join();
output.join();
}
TEST_F(DataStreamRecvrTest, TestRandomCloseSender) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
sender->set_dependency(source_dep);
EXPECT_EQ(sink_dep->ready(), true);
EXPECT_EQ(source_dep->ready(), false);
auto input_func = [&](int id) {
mock_random_sleep();
int input_block = 0;
while (true) {
if (sink_dep->ready()) {
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
sender->add_block(&block, false);
input_block++;
if (input_block == 100) {
sender->decrement_senders(id);
break;
}
}
}
std::cout << "input func : " << id << " end " << std::endl;
};
auto outout_func = [&]() {
mock_random_sleep();
try {
while (true) {
if (source_dep->ready()) {
Block block;
bool eos = false;
auto st = sender->get_batch(&block, &eos);
if (!st.ok()) {
std::cout << "get_batch error: " << st.msg() << std::endl;
break;
}
if (eos) {
break;
}
}
}
} catch (std::exception& e) {
std::cout << "exception: " << e.what() << std::endl;
}
std::cout << "output func end" << std::endl;
};
auto close_func = [&]() {
try {
mock_random_sleep();
std::cout << "close_func start" << std::endl;
recvr->close();
std::cout << "close_func end" << std::endl;
} catch (const std::exception& e) {
std::cout << "close exception: " << e.what() << std::endl;
}
};
std::vector<std::thread> threads;
threads.emplace_back(input_func, 1);
threads.emplace_back(input_func, 2);
threads.emplace_back(input_func, 3);
threads.emplace_back(outout_func);
threads.emplace_back(close_func);
for (auto& t : threads) {
if (t.joinable()) {
try {
t.join();
} catch (const std::system_error& e) {
std::cout << "Thread join error: " << e.what() << std::endl;
}
}
}
}
class MockClosure : public google::protobuf::Closure {
MockClosure() = default;
~MockClosure() override = default;
void Run() override { _cb(); }
std::function<void()> _cb;
};
void to_pblock(Block& block, PBlock* pblock) {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
int64_t compressed_time = 0;
EXPECT_TRUE(block.serialize(BeExecVersionManager::get_newest_version(), pblock,
&uncompressed_bytes, &compressed_bytes, &compressed_time,
segment_v2::CompressionTypePB::NO_COMPRESSION));
}
TEST_F(DataStreamRecvrTest, TestRemoteSender) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
sender->set_dependency(source_dep);
EXPECT_EQ(sink_dep->ready(), true);
EXPECT_EQ(source_dep->ready(), false);
EXPECT_EQ(sender->_num_remaining_senders, 3);
EXPECT_EQ(sender->_block_queue.size(), 0);
{
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
auto pblock = std::make_unique<PBlock>();
to_pblock(block, pblock.get());
MockClosure closure;
closure._cb = [&]() { std::cout << "cb" << std::endl; };
google::protobuf::Closure* done = &closure;
auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0);
if (done != nullptr) {
done->Run();
}
}
sender->close();
}
TEST_F(DataStreamRecvrTest, TestRemoteMemLimitSender) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
sender->set_dependency(source_dep);
EXPECT_EQ(sink_dep->ready(), true);
EXPECT_EQ(source_dep->ready(), false);
EXPECT_EQ(sender->_num_remaining_senders, 3);
EXPECT_EQ(sender->_block_queue.size(), 0);
config::exchg_node_buffer_size_bytes = 1;
Defer set_([&]() { config::exchg_node_buffer_size_bytes = 20485760; });
{
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
auto pblock = std::make_unique<PBlock>();
to_pblock(block, pblock.get());
MockClosure closure;
bool flag = false;
closure._cb = [&]() {
std::cout << "cb" << std::endl;
EXPECT_TRUE(flag);
};
google::protobuf::Closure* done = &closure;
auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0);
EXPECT_EQ(done, nullptr);
flag = true;
{
Block block;
bool eos = false;
auto st = sender->get_batch(&block, &eos);
}
}
sender->close();
}
TEST_F(DataStreamRecvrTest, TestRemoteMultiSender) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
sender->set_dependency(source_dep);
EXPECT_EQ(sink_dep->ready(), true);
EXPECT_EQ(source_dep->ready(), false);
std::vector<std::shared_ptr<MockClosure>> closures {10};
for (auto i = 0; i < 10; i++) {
closures[i] = std::make_shared<MockClosure>();
}
auto input_func = [&](int id) {
mock_random_sleep();
int input_block = 0;
auto closure = closures[id];
std::atomic_bool cb_flag = true;
closure->_cb = [&]() { cb_flag = true; };
while (true) {
if (sink_dep->ready() && cb_flag) {
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
auto pblock = std::make_unique<PBlock>();
to_pblock(block, pblock.get());
google::protobuf::Closure* done = closure.get();
cb_flag = false;
auto st = sender->add_block(std::move(pblock), id, input_block, &done, 0, 0);
EXPECT_TRUE(st) << st.msg();
input_block++;
if (done != nullptr) {
done->Run();
}
if (input_block == 100) {
sender->decrement_senders(id);
break;
}
}
}
std::cout << "input func : " << id << " end "
<< "input_block : " << input_block << std::endl;
};
auto outout_func = [&]() {
mock_random_sleep();
int output_block = 0;
while (true) {
if (source_dep->ready()) {
Block block;
bool eos = false;
auto st = sender->get_batch(&block, &eos);
EXPECT_TRUE(st) << st.msg();
if (!block.empty()) {
output_block++;
}
if (eos) {
EXPECT_EQ(output_block, 3 * 100);
break;
}
}
}
};
std::thread input1(input_func, 1);
std::thread input2(input_func, 2);
std::thread input3(input_func, 3);
std::thread output(outout_func);
input1.join();
input2.join();
input3.join();
output.join();
}
TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) {
create_recvr(3, false);
EXPECT_EQ(recvr->sender_queues().size(), 1);
EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
auto* sender = recvr->sender_queues().back();
auto sink_dep = sender->_local_channel_dependency;
auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
sender->set_dependency(source_dep);
EXPECT_EQ(sink_dep->ready(), true);
EXPECT_EQ(source_dep->ready(), false);
std::vector<std::shared_ptr<MockClosure>> closures {10};
for (auto i = 0; i < 10; i++) {
closures[i] = std::make_shared<MockClosure>();
}
auto input_remote_func = [&](int id) {
mock_random_sleep();
int input_block = 0;
auto closure = closures[id];
std::atomic_bool cb_flag = true;
closure->_cb = [&]() { cb_flag = true; };
while (true) {
if (sink_dep->ready() && cb_flag) {
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
auto pblock = std::make_unique<PBlock>();
to_pblock(block, pblock.get());
google::protobuf::Closure* done = closure.get();
cb_flag = false;
auto st = sender->add_block(std::move(pblock), id, input_block, &done, 0, 0);
EXPECT_TRUE(st) << st.msg();
input_block++;
if (done != nullptr) {
done->Run();
}
if (input_block == 100) {
sender->decrement_senders(id);
break;
}
}
}
};
auto input_local_func = [&](int id) {
mock_random_sleep();
int input_block = 0;
while (true) {
if (sink_dep->ready()) {
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
sender->add_block(&block, false);
input_block++;
if (input_block == 100) {
sender->decrement_senders(id);
break;
}
}
}
};
auto outout_func = [&]() {
mock_random_sleep();
int output_block = 0;
while (true) {
if (source_dep->ready()) {
Block block;
bool eos = false;
auto st = sender->get_batch(&block, &eos);
EXPECT_TRUE(st) << st.msg();
if (!block.empty()) {
output_block++;
}
if (eos) {
EXPECT_EQ(output_block, 3 * 100);
break;
}
}
}
};
std::thread input1(input_remote_func, 1);
std::thread input2(input_local_func, 2);
std::thread input3(input_remote_func, 3);
std::thread output(outout_func);
input1.join();
input2.join();
input3.join();
output.join();
}
struct MockVDataStreamMgr : public VDataStreamMgr {
~MockVDataStreamMgr() override = default;
Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id,
std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock = true) override {
*res = recvr;
return Status::OK();
}
std::shared_ptr<VDataStreamRecvr> recvr;
};
TEST_F(DataStreamRecvrTest, transmit_block) {
create_recvr(1, true);
recvr->always_exceeds_limit = true;
MockVDataStreamMgr mgr;
mgr.recvr = recvr;
MockClosure closure;
closure._cb = [&]() { std::cout << "cb" << std::endl; };
google::protobuf::Closure* done = &closure;
PTransmitDataParams request;
{
auto* pblock = request.add_blocks();
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
to_pblock(block, pblock);
}
{
auto* pblock = request.add_blocks();
auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5});
to_pblock(block, pblock);
}
{
auto st = mgr.transmit_block(&request, &done, 1000);
EXPECT_TRUE(st) << st.msg();
}
recvr->close();
}
// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.*
} // namespace doris::pipeline