blob: 72170bc7fe048a4a0f0a647fd382973e7a7592ba [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/exec/multi_cast_data_streamer.h"
#include <gtest/gtest.h>
#include <memory>
#include <vector>
#include "olap/olap_define.h"
#include "pipeline/dependency.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
class MultiCastDataStreamerTest : public testing::Test {
public:
MultiCastDataStreamerTest() = default;
~MultiCastDataStreamerTest() override = default;
void SetUp() override {
profile = std::make_unique<RuntimeProfile>("MultiCastDataStreamerTest");
custom_profile = std::make_unique<RuntimeProfile>("CustomCounters");
common_profile = std::make_unique<RuntimeProfile>("CommonCounters");
{
ADD_COUNTER_WITH_LEVEL(common_profile.get(), "MemoryUsage", TUnit::BYTES, 1);
ADD_TIMER_WITH_LEVEL(common_profile.get(), "ExecTime", 1);
ADD_TIMER_WITH_LEVEL(custom_profile.get(), "SpillTotalTime", 1);
ADD_TIMER_WITH_LEVEL(custom_profile.get(), "SpillWriteTime", 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteTaskWaitInQueueCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteTaskCount", TUnit::UNIT, 1);
ADD_TIMER_WITH_LEVEL(custom_profile.get(), "SpillWriteTaskWaitInQueueTime", 1);
ADD_TIMER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTime", 1);
ADD_TIMER_WITH_LEVEL(custom_profile.get(), "SpillWriteSerializeBlockTime", 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteBlockCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteBlockBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileTime", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadDeserializeBlockTime",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadBlockCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadBlockBytes", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileBytes", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT,
1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT,
1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT,
1);
}
profile->add_child(custom_profile.get(), true);
profile->add_child(common_profile.get(), true);
shared_state = std::make_shared<MultiCastSharedState>(&pool, cast_sender_count, 0);
multi_cast_data_streamer =
std::make_unique<MultiCastDataStreamer>(&pool, cast_sender_count, 0);
shared_state->setup_shared_profile(profile.get());
multi_cast_data_streamer->set_sink_profile(profile.get());
source_profiles.resize(cast_sender_count);
for (int i = 0; i < cast_sender_count; i++) {
auto dep = Dependency::create_shared(1, 1, "MultiCastDataStreamerTest", true);
deps.push_back(dep);
multi_cast_data_streamer->set_dep_by_sender_idx(i, dep.get());
source_profiles[i] =
std::make_unique<RuntimeProfile>(fmt::format("source_profile_{}", i));
source_common_profiles.push_back(std::make_unique<RuntimeProfile>("CommonCounters"));
source_custom_profiles.push_back(std::make_unique<RuntimeProfile>("CustomCounters"));
source_profiles[i]->add_child(source_common_profiles[i].get(), true);
source_profiles[i]->add_child(source_custom_profiles[i].get(), true);
ADD_TIMER_WITH_LEVEL(source_common_profiles[i].get(), "ExecTime", 1);
ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillTotalTime", 1);
ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillRecoverTime", 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadTaskWaitInQueueCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadTaskCount",
TUnit::UNIT, 1);
ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadTaskWaitInQueueTime",
1);
ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadFileTime", 1);
ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadDeserializeBlockTime",
1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadBlockCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadBlockBytes",
TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadFileBytes",
TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadRows", TUnit::UNIT,
1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillReadFileCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentBytes",
TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentCount",
TUnit::UNIT, 1);
multi_cast_data_streamer->set_source_profile(i, source_profiles[i].get());
}
write_dependency =
Dependency::create_shared(1, 1, "MultiCastDataStreamerTestWriteDep", true);
multi_cast_data_streamer->set_write_dependency(write_dependency.get());
fragment_mgr = ExecEnv::GetInstance()->_fragment_mgr;
ExecEnv::GetInstance()->_fragment_mgr =
new MockFragmentManager(spill_status, ExecEnv::GetInstance());
auto spill_data_dir =
std::make_unique<vectorized::SpillDataDir>("./ut_dir/spill_test", 1024L * 1024 * 4);
auto st = io::global_local_filesystem()->create_directory(spill_data_dir->path(), false);
EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path()
<< " failed: " << st.to_string();
std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> data_map;
data_map.emplace("test", std::move(spill_data_dir));
auto* spill_stream_manager = new vectorized::SpillStreamManager(std::move(data_map));
ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager;
st = spill_stream_manager->init();
EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " << st.to_string();
EXPECT_EQ(state.enable_spill(), false);
}
void TearDown() override {
ExecEnv::GetInstance()->_fragment_mgr->stop();
SAFE_DELETE(ExecEnv::GetInstance()->_fragment_mgr);
ExecEnv::GetInstance()->_fragment_mgr = fragment_mgr;
doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop();
SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr);
}
ObjectPool pool;
std::unique_ptr<MultiCastDataStreamer> multi_cast_data_streamer = nullptr;
std::vector<std::shared_ptr<Dependency>> deps;
std::shared_ptr<Dependency> write_dependency;
int cast_sender_count = 3;
MockRuntimeState state;
Status spill_status;
FragmentMgr* fragment_mgr {nullptr};
std::shared_ptr<MultiCastSharedState> shared_state;
std::unique_ptr<RuntimeProfile> profile;
std::vector<std::unique_ptr<RuntimeProfile>> source_profiles;
std::vector<std::unique_ptr<RuntimeProfile>> source_common_profiles;
std::vector<std::unique_ptr<RuntimeProfile>> source_custom_profiles;
std::unique_ptr<RuntimeProfile> custom_profile;
std::unique_ptr<RuntimeProfile> common_profile;
};
TEST_F(MultiCastDataStreamerTest, NormTest) {
using namespace vectorized;
for (auto dep : deps) {
EXPECT_FALSE(dep->ready());
}
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3});
EXPECT_TRUE(multi_cast_data_streamer->push(&state, &block, false).ok());
}
{
Block block = ColumnHelper::create_block<DataTypeString>({"a", "b", "c"});
EXPECT_TRUE(multi_cast_data_streamer->push(&state, &block, true).ok());
}
for (auto dep : deps) {
EXPECT_TRUE(dep->ready());
}
{
for (int id = 0; id < cast_sender_count; id++) {
Block block1;
bool eos = false;
EXPECT_TRUE(multi_cast_data_streamer->pull(&state, id, &block1, &eos).ok());
EXPECT_FALSE(eos);
EXPECT_TRUE(ColumnHelper::block_equal(
block1, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
Block block2;
EXPECT_TRUE(multi_cast_data_streamer->pull(&state, id, &block2, &eos).ok());
EXPECT_TRUE(eos);
EXPECT_TRUE(ColumnHelper::block_equal(
block2, ColumnHelper::create_block<DataTypeString>({"a", "b", "c"})));
}
}
}
TEST_F(MultiCastDataStreamerTest, MultiTest) {
using namespace vectorized;
std::vector<Block> blocks;
const auto input_count = 50;
for (int i = 0; i < input_count; i++) {
Block block = ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2});
blocks.push_back(block);
}
for (auto dep : deps) {
EXPECT_FALSE(dep->ready());
}
std::vector<std::vector<Block>> output_blocks(cast_sender_count);
auto output_func = [&](int id) {
while (true) {
bool eos = false;
Block block;
if (deps[id]->ready()) {
EXPECT_TRUE(multi_cast_data_streamer->pull(&state, id, &block, &eos).ok());
output_blocks[id].push_back(block);
}
if (eos) {
break;
}
}
};
std::thread output1(output_func, 0);
std::thread output2(output_func, 1);
std::thread output3(output_func, 2);
std::thread input([&] {
for (int i = 0; i < input_count; i++) {
EXPECT_TRUE(
multi_cast_data_streamer->push(&state, &blocks[i], i == input_count - 1).ok());
}
});
input.join();
output1.join();
output2.join();
output3.join();
for (int i = 0; i < input_count; i++) {
EXPECT_TRUE(ColumnHelper::block_equal(
output_blocks[0][i], ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2})));
EXPECT_TRUE(ColumnHelper::block_equal(
output_blocks[1][i], ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2})));
EXPECT_TRUE(ColumnHelper::block_equal(
output_blocks[2][i], ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2})));
}
}
TEST_F(MultiCastDataStreamerTest, SpillTest) {
using namespace vectorized;
state.set_enable_spill(true);
auto exchg_node_buffer_size_bytes = config::exchg_node_buffer_size_bytes;
config::exchg_node_buffer_size_bytes = 1;
Defer defer {[&] { config::exchg_node_buffer_size_bytes = exchg_node_buffer_size_bytes; }};
std::vector<Block> blocks;
const auto input_count = 50;
for (int i = 0; i < input_count; i++) {
Block block = ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2});
blocks.push_back(block);
}
for (auto dep : deps) {
EXPECT_FALSE(dep->ready());
}
std::vector<std::vector<Block>> output_blocks(cast_sender_count);
auto output_func = [&](int id) {
SCOPED_INIT_THREAD_CONTEXT();
while (true) {
if (!deps[id]->ready()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
bool eos = false;
Block block;
EXPECT_TRUE(multi_cast_data_streamer->pull(&state, id, &block, &eos).ok());
if (!block.empty()) {
output_blocks[id].push_back(block);
}
if (eos) {
break;
}
}
};
std::thread output1(output_func, 0);
std::thread input([&] {
for (int i = 0; i < input_count;) {
if (!write_dependency->ready()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
EXPECT_TRUE(
multi_cast_data_streamer->push(&state, &blocks[i], i == input_count - 1).ok());
i++;
}
});
input.join();
std::cout << "profile: " << profile->pretty_print() << std::endl;
ASSERT_TRUE(spill_status.ok()) << "spill status: " << spill_status.to_string();
std::thread output2(output_func, 1);
std::thread output3(output_func, 2);
output1.join();
output2.join();
output3.join();
ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0);
ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0);
ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0);
ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0);
ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0);
ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0);
ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0);
auto debug_string = multi_cast_data_streamer->debug_string();
EXPECT_TRUE(debug_string.find("MemSize:") != std::string::npos);
for (int i = 0; i < input_count; i++) {
// std::cout << output_blocks[0][i].dump_data() << std::endl;
ASSERT_TRUE(ColumnHelper::block_equal(
output_blocks[0][i], ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2})))
<< "i: " << i;
// std::cout << output_blocks[1][i].dump_data() << std::endl;
ASSERT_TRUE(ColumnHelper::block_equal(
output_blocks[1][i], ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2})))
<< "i: " << i;
// std::cout << output_blocks[2][i].dump_data() << std::endl;
ASSERT_TRUE(ColumnHelper::block_equal(
output_blocks[2][i], ColumnHelper::create_block<DataTypeInt64>({i, i + 1, i + 2})))
<< "i: " << i;
}
}
// ./run-be-ut.sh --run --filter=MultiCastDataStreamerTest.*
} // namespace doris::pipeline