blob: d349c2dad4f9dfe9e767965bcb794ae0d747834a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/pipeline.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "common/exception.h"
#include "common/status.h"
#include "dummy_task_queue.h"
#include "exprs/bloom_filter_func.h"
#include "exprs/hybrid_set.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime_filter/runtime_filter_definitions.h"
#include "thrift_builder.h"
#include "vec/columns/column.h"
#include "vec/columns/column_vector.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_number.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
static void empty_function(RuntimeState*, Status*) {}
class PipelineTest : public testing::Test {
public:
PipelineTest()
: _obj_pool(new ObjectPool()),
_mgr(std::make_unique<doris::vectorized::VDataStreamMgr>()) {}
~PipelineTest() override = default;
void SetUp() override {
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
.set_runtime_filter_max_in_num(15)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
TRuntimeFilterParamsBuilder().build());
ExecEnv::GetInstance()->set_stream_mgr(_mgr.get());
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
}
void TearDown() override {}
private:
std::shared_ptr<Pipeline> _build_pipeline(int num_instances, Pipeline* parent = nullptr) {
auto pip = std::make_shared<Pipeline>(
_next_pipeline_id(),
parent ? std::min(parent->num_tasks(), num_instances) : num_instances,
parent ? parent->num_tasks() : num_instances);
_pipelines.push_back(pip);
_pipeline_tasks.push_back(std::vector<std::shared_ptr<PipelineTask>> {});
_runtime_states.push_back(std::vector<std::unique_ptr<RuntimeState>> {});
_pipeline_profiles.push_back(nullptr);
if (parent) {
parent->set_children(pip);
}
return pip;
}
std::shared_ptr<PipelineFragmentContext> _build_fragment_context() {
int fragment_id = _next_fragment_id();
_context.push_back(std::make_shared<PipelineFragmentContext>(
_query_id, TPipelineFragmentParams(), _query_ctx, ExecEnv::GetInstance(),
empty_function,
std::bind<Status>(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report),
ExecEnv::GetInstance()->fragment_mgr(), std::placeholders::_1,
std::placeholders::_2)));
_runtime_state.push_back(RuntimeState::create_unique(
_query_id, fragment_id, _query_options, _query_ctx->query_globals,
ExecEnv::GetInstance(), _query_ctx.get()));
_runtime_state.back()->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(_context.back()));
return _context.back();
}
TUniqueId _next_ins_id() {
_ins_id.lo++;
return _ins_id;
}
void _reset() {
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_task_scheduler = std::make_unique<MockTaskScheduler>();
_query_ctx->_task_scheduler = _task_scheduler.get();
_runtime_state.clear();
_context.clear();
_fragment_id = 0;
_ins_id = TUniqueId();
_next_node_idx = 0;
_next_op_idx = 0;
_pipelines.clear();
_pipeline_profiles.clear();
_pipeline_tasks.clear();
_runtime_states.clear();
_runtime_filter_mgrs.clear();
}
int _next_fragment_id() { return _fragment_id++; }
int _next_node_id() { return _next_node_idx++; }
int _next_op_id() { return _next_op_idx--; }
int _next_sink_op_id() { return _next_sink_op_idx--; }
int _next_pipeline_id() { return _next_pipeline_idx++; }
int _max_operator_id() const { return _next_op_idx; }
// Query level
std::shared_ptr<ObjectPool> _obj_pool;
std::unique_ptr<doris::vectorized::VDataStreamMgr> _mgr;
std::shared_ptr<QueryContext> _query_ctx;
TUniqueId _query_id = TUniqueId();
TQueryOptions _query_options;
std::unique_ptr<MockTaskScheduler> _task_scheduler;
// Fragment level
// Fragment0 -> Fragment1
std::vector<std::unique_ptr<RuntimeState>> _runtime_state;
std::vector<std::shared_ptr<PipelineFragmentContext>> _context;
int _fragment_id = 0;
int _next_pipeline_idx = 0;
TUniqueId _ins_id = TUniqueId();
int _next_node_idx = 0;
int _next_op_idx = 0;
int _next_sink_op_idx = 0;
// Pipeline Level
// Fragment0[Pipeline0 -> Pipeline1] -> Fragment1[Pipeline2 -> Pipeline3]
std::vector<std::shared_ptr<Pipeline>> _pipelines;
std::vector<std::shared_ptr<RuntimeProfile>> _pipeline_profiles;
// Task Level
// Fragment0[Pipeline0[Task0] -> Pipeline1[Task0]] -> Fragment1[Pipeline2[Task0] -> Pipeline3[Task0]]
std::vector<std::vector<std::shared_ptr<PipelineTask>>> _pipeline_tasks;
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _runtime_states;
// Instance level
std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgrs;
const std::string LOCALHOST = BackendOptions::get_localhost();
const int DUMMY_PORT = config::brpc_port;
};
TEST_F(PipelineTest, HAPPY_PATH) {
// Pipeline(ExchangeOperator(id=0, UNPARTITIONED) -> ExchangeSinkOperatorX(id=1, UNPARTITIONED))
int parallelism = 1;
// Build pipeline
DescriptorTbl* desc;
OperatorPtr op;
_build_fragment_context();
auto cur_pipe = _build_pipeline(parallelism);
{
auto tnode = TPlanNodeBuilder(_next_node_id(), TPlanNodeType::EXCHANGE_NODE)
.set_exchange_node(
TExchangeNodeBuilder()
.set_partition_type(TPartitionType::UNPARTITIONED)
.append_input_row_tuples(0)
.build())
.append_row_tuples(0, false)
.build();
TTupleDescriptor tuple0 = TTupleDescriptorBuilder().set_id(0).build();
TSlotDescriptor slot0 =
TSlotDescriptorBuilder()
.set_id(0)
.set_parent(tuple0)
.set_slotType(
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
.set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_colName("test_column0")
.build();
TDescriptorTable desc_table = TDescriptorTableBuilder()
.append_slotDescriptors(slot0)
.append_tupleDescriptors(tuple0)
.build();
EXPECT_EQ(DescriptorTbl::create(_obj_pool.get(), desc_table, &desc), Status::OK());
op.reset(new ExchangeSourceOperatorX(_obj_pool.get(), tnode, _next_op_id(), *desc, 1));
EXPECT_EQ(op->init(tnode, _runtime_state.back().get()), Status::OK());
auto& exchange_operator = op->cast<ExchangeSourceOperatorX>();
EXPECT_EQ(exchange_operator._is_merging, false);
EXPECT_EQ(cur_pipe->add_operator(op, 0), Status::OK());
}
TDataSink tsink;
// 0-1
auto source_ins = _next_ins_id();
// 0-2
auto dest0 = _next_ins_id();
{
std::vector<TPlanFragmentDestination> destinations;
auto dest0_address = TNetworkAddress();
dest0_address.hostname = LOCALHOST;
dest0_address.port = DUMMY_PORT;
destinations.push_back(
TPlanFragmentDestinationBuilder(dest0, dest0_address, dest0_address).build());
auto stream_sink =
TDataStreamSinkBuilder(_next_node_id(),
TDataPartitionBuilder(TPartitionType::UNPARTITIONED).build())
.build();
tsink = TDataSinkBuilder(TDataSinkType::DATA_STREAM_SINK)
.set_stream_sink(stream_sink)
.build();
DataSinkOperatorPtr sink;
std::vector<TUniqueId> ids;
ids.push_back(source_ins);
sink.reset(new ExchangeSinkOperatorX(_runtime_state.back().get(), op->row_desc(),
_next_op_id(), stream_sink, destinations, ids));
EXPECT_EQ(sink->init(tsink), Status::OK());
EXPECT_EQ(cur_pipe->set_sink(sink), Status::OK());
EXPECT_EQ(cur_pipe->sink()->set_child(cur_pipe->operators().back()), Status::OK());
EXPECT_EQ(cur_pipe->prepare(_runtime_state.back().get()), Status::OK());
EXPECT_EQ(cur_pipe->num_tasks(), parallelism);
}
{
// Build pipeline task
int task_id = 0;
std::unique_ptr<RuntimeState> local_runtime_state = RuntimeState::create_unique(
source_ins, _query_id, _context.back()->get_fragment_id(), _query_options,
_query_ctx->query_globals, ExecEnv::GetInstance(), _query_ctx.get());
local_runtime_state->set_desc_tbl(desc);
local_runtime_state->set_per_fragment_instance_idx(0);
local_runtime_state->set_num_per_fragment_instances(parallelism);
local_runtime_state->resize_op_id_to_local_state(-1);
local_runtime_state->set_max_operator_id(-1);
local_runtime_state->set_load_stream_per_node(0);
local_runtime_state->set_total_load_streams(0);
local_runtime_state->set_num_local_sink(0);
local_runtime_state->set_task_id(task_id);
local_runtime_state->set_task_num(cur_pipe->num_tasks());
local_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(_context.back()));
_pipeline_profiles[cur_pipe->id()] =
std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(cur_pipe->id()));
std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
auto task = std::make_unique<PipelineTask>(
cur_pipe, task_id, local_runtime_state.get(), _context.back(),
_pipeline_profiles[cur_pipe->id()].get(), shared_state_map, task_id);
cur_pipe->incr_created_tasks(task_id, task.get());
_pipeline_tasks[cur_pipe->id()].push_back(std::move(task));
_runtime_states[cur_pipe->id()].push_back(std::move(local_runtime_state));
}
auto context = _build_fragment_context();
std::unique_ptr<RuntimeState> downstream_runtime_state = RuntimeState::create_unique(
dest0, _query_id, _next_fragment_id(), _query_options, _query_ctx->query_globals,
ExecEnv::GetInstance(), _query_ctx.get());
downstream_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(context));
auto downstream_pipeline_profile =
std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(cur_pipe->id()));
auto* memory_used_counter = downstream_pipeline_profile->AddHighWaterMarkCounter(
"MemoryUsage", TUnit::BYTES, "", 1);
// Construct input block
vectorized::Block block;
{
vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>();
auto int_col0 = vectorized::ColumnInt32::create();
int_col0->insert_many_vals(1, 10);
block.insert({std::move(int_col0), int_type, "test_int_col0"});
}
auto block_mem_usage = block.allocated_bytes();
EXPECT_GT(block_mem_usage - 1, 0);
auto downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
downstream_runtime_state.get(), memory_used_counter, dest0, 1, 1,
downstream_pipeline_profile.get(), false, block_mem_usage - 1);
std::vector<TScanRangeParams> scan_ranges;
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->prepare(scan_ranges, 0, tsink), Status::OK());
auto& local_state = _runtime_states.back()
.front()
->get_local_state(op->operator_id())
->cast<ExchangeLocalState>();
auto& sink_local_state =
_runtime_states.back().front()->get_sink_local_state()->cast<ExchangeSinkLocalState>();
EXPECT_EQ(sink_local_state.channels.size(), 1);
EXPECT_EQ(sink_local_state._only_local_exchange, true);
EXPECT_EQ(local_state.stream_recvr->sender_queues().size(), 1);
// Blocked by execution dependency which is set by FE 2-phase trigger.
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wait_to_start(), true);
_query_ctx->get_execution_dependency()->set_ready();
// Task is ready and be push into runnable task queue.
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0) != nullptr,
true);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wait_to_start(), false);
bool eos = false;
auto read_deps = local_state.dependencies();
{
// Blocked by exchange read dependency due to no data reached.
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->execute(&eos), Status::OK());
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_opened, true);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wake_up_early ||
_pipeline_tasks[cur_pipe->id()].back()->_eos || eos,
false);
}
EXPECT_EQ(sink_local_state.channels[0]->_local_recvr != nullptr, true);
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
false);
local_state.stream_recvr->_sender_queues[0]->add_block(&block, true);
EXPECT_EQ(block.columns(), 0);
// Task is ready since a new block reached.
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0) != nullptr,
true);
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
{
// Blocked by exchange read dependency due to no data reached.
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->execute(&eos), Status::OK());
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
false);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wake_up_early ||
_pipeline_tasks[cur_pipe->id()].back()->_eos || eos,
false);
}
{
vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>();
auto int_col0 = vectorized::ColumnInt32::create();
int_col0->insert_many_vals(1, 10);
block.insert({std::move(int_col0), int_type, "test_int_col0"});
}
block_mem_usage = block.allocated_bytes();
EXPECT_GT(block_mem_usage - 1, 0);
local_state.stream_recvr->_sender_queues[0]->add_block(&block, true);
EXPECT_EQ(block.columns(), 0);
// Task is ready since a new block reached.
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0) != nullptr,
true);
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
auto write_dependencies = sink_local_state.dependencies();
// Write dependency is blocked because each data queue is limited by block_mem_usage.
EXPECT_EQ(std::all_of(write_dependencies.cbegin(), write_dependencies.cend(),
[](const auto& dep) { return dep->ready(); }),
false);
{
// Blocked by exchange write dependency due to full data queue.
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->execute(&eos), Status::OK());
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wake_up_early ||
_pipeline_tasks[cur_pipe->id()].back()->_eos || eos,
false);
EXPECT_EQ(std::all_of(write_dependencies.cbegin(), write_dependencies.cend(),
[](const auto& dep) { return dep->ready(); }),
false);
}
{
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 1);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 1);
}
{
vectorized::Block tmp_block;
bool tmp_eos = false;
EXPECT_EQ(downstream_recvr->_sender_queues[0]->get_batch(&tmp_block, &tmp_eos),
Status::OK());
EXPECT_EQ(tmp_eos, false);
EXPECT_EQ(tmp_block.rows(), 10);
EXPECT_EQ(std::all_of(write_dependencies.cbegin(), write_dependencies.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 0);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 1);
}
{
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->execute(&eos), Status::OK());
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
false);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wake_up_early ||
_pipeline_tasks[cur_pipe->id()].back()->_eos || eos,
false);
EXPECT_EQ(std::all_of(write_dependencies.cbegin(), write_dependencies.cend(),
[](const auto& dep) { return dep->ready(); }),
false);
}
{
vectorized::Block tmp_block;
bool tmp_eos = false;
EXPECT_EQ(downstream_recvr->_sender_queues[0]->get_batch(&tmp_block, &tmp_eos),
Status::OK());
EXPECT_EQ(tmp_eos, false);
EXPECT_EQ(tmp_block.rows(), 10);
EXPECT_EQ(std::all_of(write_dependencies.cbegin(), write_dependencies.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 0);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 1);
}
// Upstream task finished.
local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0) != nullptr,
true);
{
// Blocked by exchange read dependency due to no data reached.
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->execute(&eos), Status::OK());
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
EXPECT_EQ(eos, true);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_is_pending_finish(), false);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->close(Status::OK()), Status::OK());
}
{
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 0);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 0);
}
downstream_recvr->close();
}
TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
_reset();
// Pipeline(ExchangeOperator(id=0, HASH_PARTITIONED) -> ExchangeSinkOperatorX(id=1, UNPARTITIONED))
int parallelism = 2;
// Build pipeline
DescriptorTbl* desc;
OperatorPtr op;
_build_fragment_context();
EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true);
auto cur_pipe = _build_pipeline(parallelism);
{
auto tnode = TPlanNodeBuilder(_next_node_id(), TPlanNodeType::EXCHANGE_NODE)
.set_exchange_node(
TExchangeNodeBuilder()
.set_partition_type(TPartitionType::HASH_PARTITIONED)
.append_input_row_tuples(0)
.build())
.append_row_tuples(0, false)
.build();
TTupleDescriptor tuple0 = TTupleDescriptorBuilder().set_id(0).build();
TSlotDescriptor slot0 =
TSlotDescriptorBuilder()
.set_id(0)
.set_parent(tuple0)
.set_slotType(
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
.set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_colName("test_column0")
.build();
TDescriptorTable desc_table = TDescriptorTableBuilder()
.append_slotDescriptors(slot0)
.append_tupleDescriptors(tuple0)
.build();
EXPECT_EQ(DescriptorTbl::create(_obj_pool.get(), desc_table, &desc), Status::OK());
op.reset(new ExchangeSourceOperatorX(_obj_pool.get(), tnode, _next_op_id(), *desc, 1));
EXPECT_EQ(op->init(tnode, _runtime_state.back().get()), Status::OK());
auto& exchange_operator = op->cast<ExchangeSourceOperatorX>();
EXPECT_EQ(exchange_operator._is_merging, false);
EXPECT_EQ(cur_pipe->add_operator(op, 0), Status::OK());
}
TDataSink tsink;
// 0-1
auto source_ins = _next_ins_id();
// 0-2
auto dest0 = _next_ins_id();
{
std::vector<TPlanFragmentDestination> destinations;
auto dest0_address = TNetworkAddress();
dest0_address.hostname = LOCALHOST;
dest0_address.port = DUMMY_PORT;
destinations.push_back(
TPlanFragmentDestinationBuilder(dest0, dest0_address, dest0_address).build());
auto stream_sink =
TDataStreamSinkBuilder(_next_node_id(),
TDataPartitionBuilder(TPartitionType::UNPARTITIONED).build())
.build();
tsink = TDataSinkBuilder(TDataSinkType::DATA_STREAM_SINK)
.set_stream_sink(stream_sink)
.build();
DataSinkOperatorPtr sink;
std::vector<TUniqueId> ids;
ids.push_back(source_ins);
sink.reset(new ExchangeSinkOperatorX(_runtime_state.back().get(), op->row_desc(),
_next_op_id(), stream_sink, destinations, ids));
EXPECT_EQ(sink->init(tsink), Status::OK());
EXPECT_EQ(cur_pipe->set_sink(sink), Status::OK());
EXPECT_EQ(cur_pipe->sink()->set_child(cur_pipe->operators().back()), Status::OK());
EXPECT_EQ(cur_pipe->num_tasks(), parallelism);
}
{
cur_pipe->init_data_distribution(_runtime_state.back().get());
EXPECT_EQ(cur_pipe->data_distribution().distribution_type, ExchangeType::HASH_SHUFFLE);
EXPECT_EQ(cur_pipe->sink()
->required_data_distribution(_runtime_state.back().get())
.distribution_type,
ExchangeType::NOOP);
EXPECT_EQ(cur_pipe->need_to_local_exchange(
cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()),
1),
false);
}
{
cur_pipe->operators().front()->set_serial_operator();
cur_pipe->init_data_distribution(_runtime_state.back().get());
EXPECT_EQ(cur_pipe->data_distribution().distribution_type, ExchangeType::NOOP);
EXPECT_EQ(cur_pipe->sink()
->required_data_distribution(_runtime_state.back().get())
.distribution_type,
ExchangeType::PASSTHROUGH);
EXPECT_EQ(cur_pipe->need_to_local_exchange(
cur_pipe->sink()->required_data_distribution(_runtime_state.back().get()),
1),
true);
}
}
TEST_F(PipelineTest, PLAN_HASH_JOIN) {
_reset();
/**
* Pipeline(ExchangeOperator(id=1, HASH_PARTITIONED) -> HashJoinBuildOperator(id=0))
* Pipeline(ExchangeOperator(id=2, HASH_PARTITIONED) -> HashJoinProbeOperator(id=0, UNPARTITIONED) -> ExchangeSinkOperatorX(id=3, UNPARTITIONED))
*/
int parallelism = 2;
// Build pipeline
DescriptorTbl* desc;
_build_fragment_context();
EXPECT_EQ(_runtime_state.front()->enable_local_shuffle(), true);
{
TTupleDescriptor tuple0 = TTupleDescriptorBuilder().set_id(0).build();
TSlotDescriptor slot0 =
TSlotDescriptorBuilder()
.set_id(0)
.set_parent(tuple0)
.set_slotType(
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
.set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_colName("test_column0")
.build();
TTupleDescriptor tuple1 = TTupleDescriptorBuilder().set_id(1).build();
TSlotDescriptor slot1 =
TSlotDescriptorBuilder()
.set_id(1)
.set_parent(tuple1)
.set_slotType(
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
.set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_colName("test_column1")
.build();
TTupleDescriptor tuple2 = TTupleDescriptorBuilder().set_id(2).build();
TSlotDescriptor slot2 =
TSlotDescriptorBuilder()
.set_id(2)
.set_parent(tuple2)
.set_slotType(
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
.set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_colName("test_column0")
.build();
TSlotDescriptor slot3 =
TSlotDescriptorBuilder()
.set_id(3)
.set_parent(tuple2)
.set_slotType(
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
.set_nullIndicatorBit(-1)
.set_byteOffset(4)
.set_slotIdx(1)
.set_colName("test_column1")
.build();
TDescriptorTable desc_table = TDescriptorTableBuilder()
.append_slotDescriptors(slot0)
.append_tupleDescriptors(tuple0)
.append_slotDescriptors(slot1)
.append_tupleDescriptors(tuple1)
.append_slotDescriptors(slot2)
.append_slotDescriptors(slot3)
.append_tupleDescriptors(tuple2)
.build();
EXPECT_EQ(DescriptorTbl::create(_obj_pool.get(), desc_table, &desc), Status::OK());
_runtime_state.back()->set_desc_tbl(desc);
}
auto join_node =
TPlanNodeBuilder(_next_node_id(), TPlanNodeType::HASH_JOIN_NODE)
.set_output_tuple_id(2)
.set_hash_join_node(
THashJoinNodeBuilder(
TJoinOp::INNER_JOIN,
std::vector<TEqJoinCondition> {
TEqJoinConditionBuilder(
TExprBuilder()
.append_nodes(
TExprNodeBuilder(
TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(
TTypeNodeBuilder()
.set_type(
TTypeNodeType::
SCALAR)
.set_scalar_type(
TPrimitiveType::
INT)
.build())
.build(),
0)
.set_slot_ref(
TSlotRefBuilder(
0, 0)
.build())
.build())
.build(),
TExprBuilder()
.append_nodes(
TExprNodeBuilder(
TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(
TTypeNodeBuilder()
.set_type(
TTypeNodeType::
SCALAR)
.set_scalar_type(
TPrimitiveType::
INT)
.build())
.build(),
0)
.set_slot_ref(
TSlotRefBuilder(
1, 1)
.build())
.build())
.build())
.build()})
.set_is_broadcast_join(false)
.set_dist_type(TJoinDistributionType::PARTITIONED)
.append_vintermediate_tuple_id_list(0)
.append_vintermediate_tuple_id_list(1)
.build())
.append_row_tuples(2, false)
.append_projections(
TExprBuilder()
.append_nodes(
TExprNodeBuilder(
TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(
TTypeNodeBuilder()
.set_type(
TTypeNodeType::
SCALAR)
.set_scalar_type(
TPrimitiveType::
INT)
.build())
.build(),
0)
.set_slot_ref(TSlotRefBuilder(0, 0).build())
.build())
.build())
.append_projections(
TExprBuilder()
.append_nodes(
TExprNodeBuilder(
TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(
TTypeNodeBuilder()
.set_type(
TTypeNodeType::
SCALAR)
.set_scalar_type(
TPrimitiveType::
INT)
.build())
.build(),
0)
.set_slot_ref(TSlotRefBuilder(1, 1).build())
.build())
.build())
.append_runtime_filters(TRuntimeFilterDescBuilder()
.set_bloom_filter_size_bytes(1048576)
.set_build_bf_by_runtime_size(false)
.build())
.build();
{
auto probe_side_pipe = _build_pipeline(parallelism);
OperatorPtr op;
op.reset(new HashJoinProbeOperatorX(_obj_pool.get(), join_node, _next_op_id(), *desc));
EXPECT_EQ(op->init(join_node, _runtime_state.back().get()), Status::OK());
EXPECT_EQ(probe_side_pipe->add_operator(op, 0), Status::OK());
auto tnode = TPlanNodeBuilder(_next_node_id(), TPlanNodeType::EXCHANGE_NODE)
.set_exchange_node(
TExchangeNodeBuilder()
.set_partition_type(TPartitionType::HASH_PARTITIONED)
.append_input_row_tuples(0)
.build())
.append_row_tuples(0, false)
.build();
op.reset(new ExchangeSourceOperatorX(_obj_pool.get(), tnode, _next_op_id(), *desc, 1));
EXPECT_EQ(op->init(tnode, _runtime_state.back().get()), Status::OK());
auto& exchange_operator = op->cast<ExchangeSourceOperatorX>();
EXPECT_EQ(exchange_operator._is_merging, false);
EXPECT_EQ(probe_side_pipe->operators().front()->set_child(op), Status::OK());
EXPECT_EQ(probe_side_pipe->add_operator(op, 0), Status::OK());
}
{
auto build_side_pipe = _build_pipeline(parallelism, _pipelines.front().get());
DataSinkOperatorPtr sink;
sink.reset(new HashJoinBuildSinkOperatorX(
_obj_pool.get(), _next_sink_op_id(),
_pipelines.front()->operators().back()->operator_id(), join_node, *desc));
EXPECT_EQ(sink->init(join_node, _runtime_state.back().get()), Status::OK());
EXPECT_EQ(build_side_pipe->set_sink(sink), Status::OK());
auto tnode = TPlanNodeBuilder(_next_node_id(), TPlanNodeType::EXCHANGE_NODE)
.set_exchange_node(
TExchangeNodeBuilder()
.set_partition_type(TPartitionType::HASH_PARTITIONED)
.append_input_row_tuples(1)
.build())
.append_row_tuples(1, false)
.build();
OperatorPtr op;
op.reset(new ExchangeSourceOperatorX(_obj_pool.get(), tnode, _next_op_id(), *desc, 1));
EXPECT_EQ(op->init(tnode, _runtime_state.back().get()), Status::OK());
auto& exchange_operator = op->cast<ExchangeSourceOperatorX>();
EXPECT_EQ(exchange_operator._is_merging, false);
EXPECT_EQ(build_side_pipe->add_operator(op, 0), Status::OK());
EXPECT_EQ(build_side_pipe->num_tasks(), parallelism);
EXPECT_EQ(_pipelines.front()->operators().back()->set_child(op), Status::OK());
EXPECT_EQ(sink->set_child(op), Status::OK());
}
TDataSink tsink;
std::vector<TUniqueId> ids;
auto dest_ins_id = _next_ins_id();
auto dest_node_id = _next_node_id();
{
auto cur_pipe = _pipelines.front();
std::vector<TPlanFragmentDestination> destinations;
auto dest0_address = TNetworkAddress();
dest0_address.hostname = LOCALHOST;
dest0_address.port = DUMMY_PORT;
destinations.push_back(
TPlanFragmentDestinationBuilder(dest_ins_id, dest0_address, dest0_address).build());
auto stream_sink =
TDataStreamSinkBuilder(dest_node_id,
TDataPartitionBuilder(TPartitionType::UNPARTITIONED).build())
.build();
tsink = TDataSinkBuilder(TDataSinkType::DATA_STREAM_SINK)
.set_stream_sink(stream_sink)
.build();
DataSinkOperatorPtr sink;
for (int i = 0; i < parallelism; i++) {
ids.push_back(_next_ins_id());
}
sink.reset(new ExchangeSinkOperatorX(_runtime_state.back().get(),
_pipelines.back()->operators().back()->row_desc(),
_next_sink_op_id(), stream_sink, destinations, ids));
EXPECT_EQ(sink->init(tsink), Status::OK());
EXPECT_EQ(cur_pipe->set_sink(sink), Status::OK());
EXPECT_EQ(cur_pipe->sink()->set_child(cur_pipe->operators().back()), Status::OK());
EXPECT_EQ(cur_pipe->num_tasks(), parallelism);
}
for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
_pipelines[pip_idx]->init_data_distribution(_runtime_state.back().get());
if (pip_idx == 1) {
// Pipeline(ExchangeOperator(id=1, HASH_PARTITIONED) -> HashJoinBuildOperator(id=0))
EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type,
ExchangeType::HASH_SHUFFLE);
EXPECT_EQ(_pipelines[pip_idx]
->sink()
->required_data_distribution(_runtime_state.back().get())
.distribution_type,
ExchangeType::HASH_SHUFFLE);
EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange(
_pipelines[pip_idx]->sink()->required_data_distribution(
_runtime_state.back().get()),
1),
false);
} else {
// Pipeline(ExchangeOperator(id=2, HASH_PARTITIONED) -> HashJoinProbeOperator(id=0, UNPARTITIONED) -> ExchangeSinkOperatorX(id=3, UNPARTITIONED))
_pipelines[pip_idx]->set_data_distribution(
_pipelines[pip_idx]->children().front()->data_distribution());
EXPECT_EQ(_pipelines[pip_idx]->data_distribution().distribution_type,
ExchangeType::HASH_SHUFFLE);
EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange(
_pipelines[pip_idx]->sink()->required_data_distribution(
_runtime_state.back().get()),
2),
false);
EXPECT_EQ(_pipelines[pip_idx]
->operators()
.back()
->required_data_distribution(_runtime_state.back().get())
.distribution_type,
ExchangeType::HASH_SHUFFLE);
EXPECT_EQ(_pipelines[pip_idx]->need_to_local_exchange(
_pipelines[pip_idx]->operators().back()->required_data_distribution(
_runtime_state.back().get()),
1),
false);
}
}
for (PipelinePtr& pipeline : _pipelines) {
pipeline->children().clear();
EXPECT_EQ(pipeline->prepare(_runtime_state.front().get()), Status::OK());
}
{
// Build pipeline task
int task_id = 0;
_runtime_filter_mgrs.resize(parallelism);
for (int j = 0; j < parallelism; j++) {
_runtime_filter_mgrs[j] = std::make_unique<RuntimeFilterMgr>(false);
}
for (size_t i = 0; i < _pipelines.size(); i++) {
EXPECT_EQ(_pipelines[i]->id(), i);
_pipeline_profiles[_pipelines[i]->id()] = std::make_shared<RuntimeProfile>(
"Pipeline : " + std::to_string(_pipelines[i]->id()));
for (int j = 0; j < parallelism; j++) {
std::unique_ptr<RuntimeState> local_runtime_state = RuntimeState::create_unique(
ids[j], _query_id, _context.back()->get_fragment_id(), _query_options,
_query_ctx->query_globals, ExecEnv::GetInstance(), _query_ctx.get());
local_runtime_state->set_desc_tbl(desc);
local_runtime_state->set_per_fragment_instance_idx(j);
local_runtime_state->set_be_number(j);
local_runtime_state->set_num_per_fragment_instances(parallelism);
local_runtime_state->resize_op_id_to_local_state(_max_operator_id());
local_runtime_state->set_max_operator_id(_max_operator_id());
local_runtime_state->set_load_stream_per_node(0);
local_runtime_state->set_total_load_streams(0);
local_runtime_state->set_num_local_sink(0);
local_runtime_state->set_task_id(task_id++);
local_runtime_state->set_task_num(_pipelines[i]->num_tasks());
local_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(_context.back()));
local_runtime_state->set_runtime_filter_mgr(_runtime_filter_mgrs[j].get());
std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
auto task = std::make_unique<PipelineTask>(
_pipelines[i], task_id, local_runtime_state.get(), _context.back(),
_pipeline_profiles[_pipelines[i]->id()].get(), shared_state_map, j);
_pipelines[i]->incr_created_tasks(j, task.get());
_pipeline_tasks[_pipelines[i]->id()].push_back(std::move(task));
_runtime_states[_pipelines[i]->id()].push_back(std::move(local_runtime_state));
}
}
}
{
_pipeline_tasks[0][0]->inject_shared_state(_pipeline_tasks[1][0]->get_sink_shared_state());
_pipeline_tasks[0][1]->inject_shared_state(_pipeline_tasks[1][1]->get_sink_shared_state());
}
std::shared_ptr<vectorized::VDataStreamRecvr> downstream_recvr;
auto downstream_pipeline_profile = std::make_shared<RuntimeProfile>("Downstream Pipeline");
{
// Build downstream recvr
auto context = _build_fragment_context();
std::unique_ptr<RuntimeState> downstream_runtime_state = RuntimeState::create_unique(
dest_ins_id, _query_id, context->get_fragment_id(), _query_options,
_query_ctx->query_globals, ExecEnv::GetInstance(), _query_ctx.get());
downstream_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(context));
auto* memory_used_counter = downstream_pipeline_profile->AddHighWaterMarkCounter(
"MemoryUsage", TUnit::BYTES, "", 1);
downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
downstream_runtime_state.get(), memory_used_counter, dest_ins_id, dest_node_id,
parallelism, downstream_pipeline_profile.get(), false, 2048000);
}
for (size_t i = 0; i < _pipelines.size(); i++) {
for (int j = 0; j < parallelism; j++) {
std::vector<TScanRangeParams> scan_ranges;
EXPECT_EQ(_pipeline_tasks[_pipelines[i]->id()][j]->prepare(scan_ranges, j, tsink),
Status::OK());
if (i == 1) {
auto& local_state = _runtime_states[i][j]
->get_sink_local_state()
->cast<HashJoinBuildSinkLocalState>();
EXPECT_EQ(local_state._runtime_filter_producer_helper->_producers.size(), 1);
EXPECT_EQ(local_state._should_build_hash_table, true);
}
}
}
{
for (size_t i = 0; i < _pipelines.size(); i++) {
for (int j = 0; j < parallelism; j++) {
// Blocked by execution dependency which is set by FE 2-phase trigger.
EXPECT_EQ(_pipeline_tasks[_pipelines[i]->id()][j]->_wait_to_start(), true);
}
}
EXPECT_EQ(_query_ctx->get_execution_dependency()->_blocked_task.size(),
_pipelines.size() * parallelism);
_query_ctx->get_execution_dependency()->set_ready();
for (size_t i = 0; i < _pipelines.size(); i++) {
for (int j = 0; j < parallelism; j++) {
// Task is ready and be push into runnable task queue.
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0) !=
nullptr,
true);
}
}
for (size_t i = 0; i < _pipelines.size(); i++) {
for (int j = 0; j < parallelism; j++) {
EXPECT_EQ(_pipeline_tasks[_pipelines[i]->id()][j]->_wait_to_start(), false);
}
}
}
{
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 0);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 2);
}
{
for (int i = _pipelines.size() - 1; i >= 0; i--) {
for (int j = 0; j < parallelism; j++) {
bool eos = false;
EXPECT_EQ(_pipeline_tasks[i][j]->execute(&eos), Status::OK());
EXPECT_EQ(_pipeline_tasks[i][j]->_opened, true);
EXPECT_EQ(eos, false);
}
}
}
for (int i = _pipelines.size() - 1; i >= 0; i--) {
for (int j = 0; j < parallelism; j++) {
{
vectorized::Block block;
{
vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
auto int_col0 = vectorized::ColumnInt32::create();
if (j == 0 || i == 0) {
int_col0->insert_many_vals(j, 10);
} else {
size_t ndv = 16;
for (size_t n = 0; n < ndv; n++) {
int_col0->insert_many_vals(n, 1);
}
}
block.insert({std::move(int_col0), int_type, "test_int_col0"});
}
auto& local_state =
_runtime_states[i][j]
->get_local_state(_pipelines[i]->operators().front()->operator_id())
->cast<ExchangeLocalState>();
EXPECT_EQ(local_state.stream_recvr->_sender_queues[0]->_source_dependency->ready(),
false);
EXPECT_EQ(local_state.stream_recvr->_sender_queues[0]
->_source_dependency->_blocked_task.size(),
i == 1 ? 1 : 0);
local_state.stream_recvr->_sender_queues[0]->add_block(&block, true);
}
}
}
{
// Pipeline 1 is blocked by exchange dependency so tasks are ready after data reached.
// Pipeline 0 is blocked by hash join dependency and is still waiting for upstream tasks done.
for (int j = 0; j < parallelism; j++) {
// Task is ready and be push into runnable task queue.
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0) !=
nullptr,
true);
}
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0), nullptr);
for (int j = 0; j < parallelism; j++) {
EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
}
}
{
// Pipeline 1 ran first and build hash table in join build operator.
for (int j = 0; j < parallelism; j++) {
bool eos = false;
EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK());
EXPECT_EQ(eos, false);
}
for (int j = 0; j < parallelism; j++) {
auto& local_state =
_runtime_states[1][j]
->get_local_state(_pipelines[1]->operators().front()->operator_id())
->cast<ExchangeLocalState>();
local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
bool eos = false;
EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK());
EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
EXPECT_EQ(eos, true);
auto& sink_local_state = _runtime_states[1][j]
->get_sink_local_state()
->cast<HashJoinBuildSinkLocalState>();
EXPECT_EQ(
sink_local_state._runtime_filter_producer_helper->_skip_runtime_filters_process,
false);
EXPECT_EQ(sink_local_state._runtime_filter_producer_helper->_producers.size(), 1);
EXPECT_TRUE(
sink_local_state._runtime_filter_producer_helper->_producers[0]->_rf_state ==
RuntimeFilterProducer::State::WAITING_FOR_DATA);
EXPECT_EQ(sink_local_state._runtime_filter_producer_helper->_producers[0]
->_runtime_filter_type,
RuntimeFilterType::IN_OR_BLOOM_FILTER);
EXPECT_EQ(_pipeline_tasks[1][j]->_is_pending_finish(), false);
auto wrapper =
sink_local_state._runtime_filter_producer_helper->_producers[0]->_wrapper;
EXPECT_EQ(_pipeline_tasks[1][j]->close(Status::OK()), Status::OK());
EXPECT_EQ(wrapper->get_real_type(),
j == 0 ? RuntimeFilterType::IN_FILTER : RuntimeFilterType::BLOOM_FILTER)
<< " " << j << " "
<< sink_local_state._runtime_filter_producer_helper->_producers[0]
->debug_string();
EXPECT_TRUE(wrapper->_state == RuntimeFilterWrapper::State::READY);
if (j == 0) {
EXPECT_EQ(wrapper->_hybrid_set->size(), 1);
} else {
EXPECT_EQ(wrapper->_bloom_filter_func->build_bf_by_runtime_size(), false);
EXPECT_EQ(wrapper->_bloom_filter_func->_bloom_filter_length, 1048576);
}
}
}
{
// Pipeline 0 ran once hash table is built.
for (int j = 0; j < parallelism; j++) {
EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false);
}
for (int j = 0; j < parallelism; j++) {
bool eos = false;
EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK());
EXPECT_EQ(eos, false);
}
for (int j = 0; j < parallelism; j++) {
auto& local_state =
_runtime_states[0][j]
->get_local_state(_pipelines[0]->operators().front()->operator_id())
->cast<ExchangeLocalState>();
local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
bool eos = false;
EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK());
EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false);
EXPECT_EQ(eos, true);
EXPECT_EQ(_pipeline_tasks[0][j]->_is_pending_finish(), false);
EXPECT_EQ(_pipeline_tasks[0][j]->close(Status::OK()), Status::OK());
}
}
{
// [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] join [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] produces 100 rows in instance 0.
// [2, 2, 2, 2, 2, 2, 2, 2, 2, 2] join [2, 2, 2, 2, 2, 2, 2, 2, 2, 2] produces 100 rows in instance 1.
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 2);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.front()._block->rows(),
10 * 10);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.back()._block->rows(), 10);
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 0);
}
downstream_recvr->close();
}
} // namespace doris::pipeline