| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <gtest/gtest.h> |
| |
| #include <boost/thread/thread.hpp> |
| #include <iostream> |
| |
| #include "common/status.h" |
| #include "exprs/slot_ref.h" |
| #include "gen_cpp/BackendService.h" |
| #include "gen_cpp/Descriptors_types.h" |
| #include "gen_cpp/Types_types.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/data_stream_mgr.h" |
| #include "runtime/data_stream_recvr.h" |
| #include "runtime/data_stream_sender.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/raw_value.h" |
| #include "runtime/row_batch.h" |
| #include "runtime/runtime_state.h" |
| #include "util/cpu_info.h" |
| #include "util/debug_util.h" |
| #include "util/disk_info.h" |
| #include "util/logging.h" |
| #include "util/mem_info.h" |
| #include "util/thrift_server.h" |
| |
| using std::string; |
| using std::vector; |
| using std::multiset; |
| |
| using boost::scoped_ptr; |
| using boost::thread; |
| |
| namespace doris { |
| |
| class DorisTestBackend : public BackendServiceIf { |
| public: |
| DorisTestBackend(DataStreamMgr* stream_mgr) : _mgr(stream_mgr) {} |
| virtual ~DorisTestBackend() {} |
| |
| virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val, |
| const TExecPlanFragmentParams& params) {} |
| |
| virtual void cancel_plan_fragment(TCancelPlanFragmentResult& return_val, |
| const TCancelPlanFragmentParams& params) {} |
| |
| virtual void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) { |
| /* |
| LOG(ERROR) << "transmit_data(): instance_id=" << params.dest_fragment_instance_id |
| << " node_id=" << params.dest_node_id |
| << " #rows=" << params.row_batch.num_rows |
| << " eos=" << (params.eos ? "true" : "false"); |
| if (!params.eos) { |
| _mgr->add_data( |
| params.dest_fragment_instance_id, |
| params.dest_node_id, |
| params.row_batch, |
| params.sender_id).set_t_status(&return_val); |
| } else { |
| Status status = _mgr->close_sender( |
| params.dest_fragment_instance_id, params.dest_node_id, params.sender_id, params.be_number); |
| status.set_t_status(&return_val); |
| LOG(ERROR) << "close_sender status: " << status.get_error_msg(); |
| } |
| */ |
| } |
| |
| virtual void fetch_data(TFetchDataResult& return_val, const TFetchDataParams& params) {} |
| |
| virtual void submit_tasks(TAgentResult& return_val, |
| const std::vector<TAgentTaskRequest>& tasks) {} |
| |
| virtual void make_snapshot(TAgentResult& return_val, const TSnapshotRequest& snapshot_request) { |
| } |
| |
| virtual void release_snapshot(TAgentResult& return_val, const std::string& snapshot_path) {} |
| |
| virtual void publish_cluster_state(TAgentResult& return_val, |
| const TAgentPublishRequest& request) {} |
| |
| virtual void submit_etl_task(TAgentResult& return_val, const TMiniLoadEtlTaskRequest& request) { |
| } |
| |
| virtual void get_etl_status(TMiniLoadEtlStatusResult& return_val, |
| const TMiniLoadEtlStatusRequest& request) {} |
| |
| virtual void delete_etl_files(TAgentResult& return_val, const TDeleteEtlFilesRequest& request) { |
| } |
| |
| virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id, |
| const int32_t num_senders) {} |
| |
| virtual void deregister_pull_load_task(TStatus& _return, const TUniqueId& id) {} |
| |
| virtual void report_pull_load_sub_task_info(TStatus& _return, |
| const TPullLoadSubTaskInfo& task_info) {} |
| |
| virtual void fetch_pull_load_task_info(TFetchPullLoadTaskInfoResult& _return, |
| const TUniqueId& id) {} |
| |
| virtual void fetch_all_pull_load_task_infos(TFetchAllPullLoadTaskInfosResult& _return) {} |
| |
| private: |
| DataStreamMgr* _mgr; |
| }; |
| |
| class DataStreamTest : public testing::Test { |
| protected: |
| DataStreamTest() |
| : _limit(new MemTracker(-1)), |
| _runtime_state(TUniqueId(), TQueryOptions(), "", &_exec_env), |
| _next_val(0) { |
| _exec_env.init_for_tests(); |
| _runtime_state.init_mem_trackers(TUniqueId()); |
| } |
| // null dtor to pass codestyle check |
| ~DataStreamTest() {} |
| |
| virtual void SetUp() { |
| create_row_desc(); |
| create_tuple_comparator(); |
| create_row_batch(); |
| |
| _next_instance_id.lo = 0; |
| _next_instance_id.hi = 0; |
| _stream_mgr = new DataStreamMgr(); |
| |
| _broadcast_sink.dest_node_id = DEST_NODE_ID; |
| _broadcast_sink.output_partition.type = TPartitionType::UNPARTITIONED; |
| |
| _random_sink.dest_node_id = DEST_NODE_ID; |
| _random_sink.output_partition.type = TPartitionType::RANDOM; |
| |
| _hash_sink.dest_node_id = DEST_NODE_ID; |
| _hash_sink.output_partition.type = TPartitionType::HASH_PARTITIONED; |
| // there's only one column to partition on |
| TExprNode expr_node; |
| expr_node.node_type = TExprNodeType::SLOT_REF; |
| expr_node.type.types.push_back(TTypeNode()); |
| expr_node.type.types.back().__isset.scalar_type = true; |
| expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT; |
| expr_node.num_children = 0; |
| TSlotRef slot_ref; |
| slot_ref.slot_id = 0; |
| expr_node.__set_slot_ref(slot_ref); |
| TExpr expr; |
| expr.nodes.push_back(expr_node); |
| _hash_sink.output_partition.__isset.partition_exprs = true; |
| _hash_sink.output_partition.partition_exprs.push_back(expr); |
| |
| // Ensure that individual sender info addresses don't change |
| _sender_info.reserve(MAX_SENDERS); |
| _receiver_info.reserve(MAX_RECEIVERS); |
| start_backend(); |
| } |
| |
| const TDataStreamSink& get_sink(TPartitionType::type partition_type) { |
| switch (partition_type) { |
| case TPartitionType::UNPARTITIONED: |
| return _broadcast_sink; |
| case TPartitionType::RANDOM: |
| return _random_sink; |
| case TPartitionType::HASH_PARTITIONED: |
| return _hash_sink; |
| default: |
| DCHECK(false) << "Unhandled sink type: " << partition_type; |
| } |
| // Should never reach this. |
| return _broadcast_sink; |
| } |
| |
| virtual void TearDown() { |
| _lhs_slot_ctx->close(NULL); |
| _rhs_slot_ctx->close(NULL); |
| _exec_env.client_cache()->test_shutdown(); |
| stop_backend(); |
| } |
| |
| void reset() { |
| _sender_info.clear(); |
| _receiver_info.clear(); |
| _dest.clear(); |
| } |
| |
| // We reserve contiguous memory for senders in SetUp. If a test uses more |
| // senders, a DCHECK will fail and you should increase this value. |
| static const int MAX_SENDERS = 16; |
| static const int MAX_RECEIVERS = 16; |
| static const PlanNodeId DEST_NODE_ID = 1; |
| static const int BATCH_CAPACITY = 100; // rows |
| static const int PER_ROW_DATA = 8; |
| static const int TOTAL_DATA_SIZE = 8 * 1024; |
| static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA; |
| |
| ObjectPool _obj_pool; |
| std::shared_ptr<MemTracker> _limit; |
| std::shared_ptr<MemTracker> _tracker; |
| DescriptorTbl* _desc_tbl; |
| const RowDescriptor* _row_desc; |
| TupleRowComparator* _less_than; |
| ExecEnv _exec_env; |
| RuntimeState _runtime_state; |
| TUniqueId _next_instance_id; |
| string _stmt; |
| |
| // RowBatch generation |
| boost::scoped_ptr<RowBatch> _batch; |
| int _next_val; |
| int64_t* _tuple_mem; |
| |
| // receiving node |
| DataStreamMgr* _stream_mgr; |
| ThriftServer* _server; |
| |
| // sending node(s) |
| TDataStreamSink _broadcast_sink; |
| TDataStreamSink _random_sink; |
| TDataStreamSink _hash_sink; |
| std::vector<TPlanFragmentDestination> _dest; |
| |
| struct SenderInfo { |
| thread* thread_handle; |
| Status status; |
| int num_bytes_sent; |
| |
| SenderInfo() : thread_handle(NULL), num_bytes_sent(0) {} |
| }; |
| std::vector<SenderInfo> _sender_info; |
| |
| struct ReceiverInfo { |
| TPartitionType::type stream_type; |
| int num_senders; |
| int receiver_num; |
| |
| thread* thread_handle; |
| boost::shared_ptr<DataStreamRecvr> stream_recvr; |
| Status status; |
| int num_rows_received; |
| multiset<int64_t> data_values; |
| |
| ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num) |
| : stream_type(stream_type), |
| num_senders(num_senders), |
| receiver_num(receiver_num), |
| thread_handle(NULL), |
| stream_recvr(NULL), |
| num_rows_received(0) {} |
| |
| ~ReceiverInfo() { |
| delete thread_handle; |
| stream_recvr.reset(); |
| } |
| }; |
| std::vector<ReceiverInfo> _receiver_info; |
| |
| // Create an instance id and add it to _dest |
| void get_next_instance_id(TUniqueId* instance_id) { |
| _dest.push_back(TPlanFragmentDestination()); |
| TPlanFragmentDestination& dest = _dest.back(); |
| dest.fragment_instance_id = _next_instance_id; |
| dest.server.hostname = "127.0.0.1"; |
| dest.server.port = config::port; |
| *instance_id = _next_instance_id; |
| ++_next_instance_id.lo; |
| } |
| |
| // RowDescriptor to mimic "select bigint_col from alltypesagg", except the slot |
| // isn't nullable |
| void create_row_desc() { |
| // create DescriptorTbl |
| TTupleDescriptor tuple_desc; |
| tuple_desc.__set_id(0); |
| tuple_desc.__set_byteSize(8); |
| tuple_desc.__set_numNullBytes(0); |
| TDescriptorTable thrift_desc_tbl; |
| thrift_desc_tbl.tupleDescriptors.push_back(tuple_desc); |
| TSlotDescriptor slot_desc; |
| slot_desc.__set_id(0); |
| slot_desc.__set_parent(0); |
| |
| slot_desc.slotType.types.push_back(TTypeNode()); |
| slot_desc.slotType.types.back().__isset.scalar_type = true; |
| slot_desc.slotType.types.back().scalar_type.type = TPrimitiveType::BIGINT; |
| |
| slot_desc.__set_columnPos(0); |
| slot_desc.__set_byteOffset(0); |
| slot_desc.__set_nullIndicatorByte(0); |
| slot_desc.__set_nullIndicatorBit(-1); |
| slot_desc.__set_slotIdx(0); |
| slot_desc.__set_isMaterialized(true); |
| thrift_desc_tbl.slotDescriptors.push_back(slot_desc); |
| EXPECT_TRUE(DescriptorTbl::create(&_obj_pool, thrift_desc_tbl, &_desc_tbl).ok()); |
| _runtime_state.set_desc_tbl(_desc_tbl); |
| |
| std::vector<TTupleId> row_tids; |
| row_tids.push_back(0); |
| |
| std::vector<bool> nullable_tuples; |
| nullable_tuples.push_back(false); |
| _row_desc = _obj_pool.add(new RowDescriptor(*_desc_tbl, row_tids, nullable_tuples)); |
| } |
| |
| // Create a tuple comparator to sort in ascending order on the single bigint column. |
| void create_tuple_comparator() { |
| TExprNode expr_node; |
| expr_node.node_type = TExprNodeType::SLOT_REF; |
| expr_node.type.types.push_back(TTypeNode()); |
| expr_node.type.types.back().__isset.scalar_type = true; |
| expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT; |
| expr_node.num_children = 0; |
| TSlotRef slot_ref; |
| slot_ref.slot_id = 0; |
| expr_node.__set_slot_ref(slot_ref); |
| |
| SlotRef* lhs_slot = _obj_pool.add(new SlotRef(expr_node)); |
| _lhs_slot_ctx = _obj_pool.add(new ExprContext(lhs_slot)); |
| SlotRef* rhs_slot = _obj_pool.add(new SlotRef(expr_node)); |
| _rhs_slot_ctx = _obj_pool.add(new ExprContext(rhs_slot)); |
| |
| _lhs_slot_ctx->prepare(&_runtime_state, *_row_desc, _tracker.get()); |
| _rhs_slot_ctx->prepare(&_runtime_state, *_row_desc, _tracker.get()); |
| _lhs_slot_ctx->open(NULL); |
| _rhs_slot_ctx->open(NULL); |
| SortExecExprs* sort_exprs = _obj_pool.add(new SortExecExprs()); |
| sort_exprs->init(vector<ExprContext*>(1, _lhs_slot_ctx), |
| std::vector<ExprContext*>(1, _rhs_slot_ctx)); |
| _less_than = _obj_pool.add(new TupleRowComparator(*sort_exprs, std::vector<bool>(1, true), |
| std::vector<bool>(1, false))); |
| } |
| |
| // Create _batch, but don't fill it with data yet. Assumes we created _row_desc. |
| RowBatch* create_row_batch() { |
| RowBatch* batch = new RowBatch(*_row_desc, BATCH_CAPACITY, _limit.get()); |
| int64_t* tuple_mem = |
| reinterpret_cast<int64_t*>(batch->tuple_data_pool()->allocate(BATCH_CAPACITY * 8)); |
| bzero(tuple_mem, BATCH_CAPACITY * 8); |
| |
| for (int i = 0; i < BATCH_CAPACITY; ++i) { |
| int idx = batch->add_row(); |
| TupleRow* row = batch->get_row(idx); |
| row->set_tuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i])); |
| batch->commit_last_row(); |
| } |
| |
| return batch; |
| } |
| |
| void get_next_batch(RowBatch* batch, int* next_val) { |
| LOG(INFO) << "batch_capacity=" << BATCH_CAPACITY << " next_val=" << *next_val; |
| for (int i = 0; i < BATCH_CAPACITY; ++i) { |
| TupleRow* row = batch->get_row(i); |
| int64_t* val = reinterpret_cast<int64_t*>(row->get_tuple(0)->get_slot(0)); |
| *val = (*next_val)++; |
| } |
| } |
| |
| // Start receiver (expecting given number of senders) in separate thread. |
| void start_receiver(TPartitionType::type stream_type, int num_senders, int receiver_num, |
| int buffer_size, bool is_merging, TUniqueId* out_id = NULL) { |
| VLOG_QUERY << "start receiver"; |
| RuntimeProfile* profile = _obj_pool.add(new RuntimeProfile("TestReceiver")); |
| TUniqueId instance_id; |
| get_next_instance_id(&instance_id); |
| _receiver_info.push_back(ReceiverInfo(stream_type, num_senders, receiver_num)); |
| ReceiverInfo& info = _receiver_info.back(); |
| info.stream_recvr = |
| _stream_mgr->create_recvr(&_runtime_state, *_row_desc, instance_id, DEST_NODE_ID, |
| num_senders, buffer_size, profile, is_merging); |
| if (!is_merging) { |
| info.thread_handle = new thread(&DataStreamTest::read_stream, this, &info); |
| } else { |
| info.thread_handle = |
| new thread(&DataStreamTest::read_stream_merging, this, &info, profile); |
| } |
| |
| if (out_id != NULL) { |
| *out_id = instance_id; |
| } |
| } |
| |
| void join_receivers() { |
| VLOG_QUERY << "join receiver\n"; |
| |
| for (int i = 0; i < _receiver_info.size(); ++i) { |
| _receiver_info[i].thread_handle->join(); |
| _receiver_info[i].stream_recvr->close(); |
| } |
| } |
| |
| // Deplete stream and print batches |
| void read_stream(ReceiverInfo* info) { |
| RowBatch* batch = NULL; |
| VLOG_QUERY << "start reading"; |
| |
| while (!(info->status = info->stream_recvr->get_batch(&batch)).is_cancelled() && |
| (batch != NULL)) { |
| VLOG_QUERY << "read batch #rows=" << (batch != NULL ? batch->num_rows() : 0); |
| |
| for (int i = 0; i < batch->num_rows(); ++i) { |
| TupleRow* row = batch->get_row(i); |
| info->data_values.insert(*static_cast<int64_t*>(row->get_tuple(0)->get_slot(0))); |
| } |
| |
| SleepFor(MonoDelta::FromMilliseconds( |
| 10)); // slow down receiver to exercise buffering logic |
| } |
| |
| if (info->status.is_cancelled()) { |
| VLOG_QUERY << "reader is cancelled"; |
| } |
| |
| VLOG_QUERY << "done reading"; |
| } |
| |
| void read_stream_merging(ReceiverInfo* info, RuntimeProfile* profile) { |
| info->status = info->stream_recvr->create_merger(*_less_than); |
| if (info->status.is_cancelled()) { |
| return; |
| } |
| RowBatch batch(*_row_desc, 1024, _limit.get()); |
| VLOG_QUERY << "start reading merging"; |
| bool eos = false; |
| while (!(info->status = info->stream_recvr->get_next(&batch, &eos)).is_cancelled()) { |
| VLOG_QUERY << "read batch #rows=" << batch.num_rows(); |
| for (int i = 0; i < batch.num_rows(); ++i) { |
| TupleRow* row = batch.get_row(i); |
| info->data_values.insert(*static_cast<int64_t*>(row->get_tuple(0)->get_slot(0))); |
| } |
| SleepFor(MonoDelta::FromMilliseconds( |
| 10)); // slow down receiver to exercise buffering logic |
| batch.reset(); |
| if (eos) { |
| break; |
| } |
| } |
| if (info->status.is_cancelled()) { |
| VLOG_QUERY << "reader is cancelled"; |
| } |
| VLOG_QUERY << "done reading"; |
| } |
| |
| // Verify correctness of receivers' data values. |
| void check_receivers(TPartitionType::type stream_type, int num_senders) { |
| int64_t total = 0; |
| multiset<int64_t> all_data_values; |
| |
| for (int i = 0; i < _receiver_info.size(); ++i) { |
| ReceiverInfo& info = _receiver_info[i]; |
| EXPECT_TRUE(info.status.ok()); |
| total += info.data_values.size(); |
| DCHECK_EQ(info.stream_type, stream_type); |
| DCHECK_EQ(info.num_senders, num_senders); |
| |
| if (stream_type == TPartitionType::UNPARTITIONED) { |
| EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, info.data_values.size()); |
| } |
| |
| all_data_values.insert(info.data_values.begin(), info.data_values.end()); |
| |
| int k = 0; |
| for (multiset<int64_t>::iterator j = info.data_values.begin(); |
| j != info.data_values.end(); ++j, ++k) { |
| if (stream_type == TPartitionType::UNPARTITIONED) { |
| // unpartitioned streams contain all values as many times as there are |
| // senders |
| EXPECT_EQ(k / num_senders, *j); |
| } else if (stream_type == TPartitionType::HASH_PARTITIONED) { |
| // hash-partitioned streams send values to the right partition |
| int64_t value = *j; |
| uint32_t hash_val = RawValue::get_hash_value_fvn(&value, TYPE_BIGINT, 0U); |
| EXPECT_EQ(hash_val % _receiver_info.size(), info.receiver_num); |
| } |
| } |
| } |
| |
| if (stream_type == TPartitionType::HASH_PARTITIONED) { |
| EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total); |
| |
| int k = 0; |
| for (multiset<int64_t>::iterator j = all_data_values.begin(); |
| j != all_data_values.end(); ++j, ++k) { |
| // each sender sent all values |
| EXPECT_EQ(k / num_senders, *j); |
| |
| if (k / num_senders != *j) { |
| break; |
| } |
| } |
| } |
| } |
| |
| void check_senders() { |
| for (int i = 0; i < _sender_info.size(); ++i) { |
| EXPECT_TRUE(_sender_info[i].status.ok()); |
| EXPECT_GT(_sender_info[i].num_bytes_sent, 0) << "info i=" << i; |
| } |
| } |
| |
| // Start backend in separate thread. |
| void start_backend() { |
| boost::shared_ptr<DorisTestBackend> handler(new DorisTestBackend(_stream_mgr)); |
| boost::shared_ptr<apache::thrift::TProcessor> processor( |
| new BackendServiceProcessor(handler)); |
| _server = new ThriftServer("DataStreamTest backend", processor, config::port, NULL); |
| _server->start(); |
| } |
| |
| void stop_backend() { |
| VLOG_QUERY << "stop backend\n"; |
| _server->stop_for_testing(); |
| delete _server; |
| } |
| |
| void start_sender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED, |
| int channel_buffer_size = 1024) { |
| VLOG_QUERY << "start sender"; |
| int sender_id = _sender_info.size(); |
| DCHECK_LT(sender_id, MAX_SENDERS); |
| _sender_info.push_back(SenderInfo()); |
| SenderInfo& info = _sender_info.back(); |
| info.thread_handle = new thread(&DataStreamTest::sender, this, sender_id, |
| channel_buffer_size, partition_type); |
| } |
| |
| void join_senders() { |
| VLOG_QUERY << "join senders\n"; |
| for (int i = 0; i < _sender_info.size(); ++i) { |
| _sender_info[i].thread_handle->join(); |
| } |
| } |
| |
| void sender(int sender_num, int channel_buffer_size, TPartitionType::type partition_type) { |
| RuntimeState state(TExecPlanFragmentParams(), TQueryOptions(), "", &_exec_env); |
| state.set_desc_tbl(_desc_tbl); |
| state.init_mem_trackers(TUniqueId()); |
| VLOG_QUERY << "create sender " << sender_num; |
| const TDataStreamSink& stream_sink = |
| (partition_type == TPartitionType::UNPARTITIONED ? _broadcast_sink : _hash_sink); |
| DataStreamSender sender(&_obj_pool, sender_num, *_row_desc, stream_sink, _dest, |
| channel_buffer_size); |
| |
| TDataSink data_sink; |
| data_sink.__set_type(TDataSinkType::DATA_STREAM_SINK); |
| data_sink.__set_stream_sink(stream_sink); |
| EXPECT_TRUE(sender.init(data_sink).ok()); |
| |
| EXPECT_TRUE(sender.prepare(&state).ok()); |
| EXPECT_TRUE(sender.open(&state).ok()); |
| boost::scoped_ptr<RowBatch> batch(create_row_batch()); |
| SenderInfo& info = _sender_info[sender_num]; |
| int next_val = 0; |
| |
| for (int i = 0; i < NUM_BATCHES; ++i) { |
| get_next_batch(batch.get(), &next_val); |
| VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows(); |
| info.status = sender.send(&state, batch.get()); |
| |
| if (!info.status.ok()) { |
| LOG(WARNING) << "something is wrong when sending: " << info.status.get_error_msg(); |
| break; |
| } |
| } |
| |
| VLOG_QUERY << "closing sender" << sender_num; |
| info.status = sender.close(&state, Status::OK()); |
| info.num_bytes_sent = sender.get_num_data_bytes_sent(); |
| |
| batch->reset(); |
| } |
| |
| void test_stream(TPartitionType::type stream_type, int num_senders, int num_receivers, |
| int buffer_size, bool is_merging) { |
| LOG(INFO) << "Testing stream=" << stream_type << " #senders=" << num_senders |
| << " #receivers=" << num_receivers << " buffer_size=" << buffer_size; |
| reset(); |
| |
| for (int i = 0; i < num_receivers; ++i) { |
| start_receiver(stream_type, num_senders, i, buffer_size, is_merging); |
| } |
| |
| for (int i = 0; i < num_senders; ++i) { |
| start_sender(stream_type, buffer_size); |
| } |
| |
| join_senders(); |
| check_senders(); |
| join_receivers(); |
| check_receivers(stream_type, num_senders); |
| } |
| |
| private: |
| ExprContext* _lhs_slot_ctx; |
| ExprContext* _rhs_slot_ctx; |
| }; |
| |
| TEST_F(DataStreamTest, UnknownSenderSmallResult) { |
| // starting a sender w/o a corresponding receiver does not result in an error because |
| // we cannot distinguish whether a receiver was never created or the receiver |
| // willingly tore down the stream |
| // case 1: entire query result fits in single buffer, close() returns ok |
| TUniqueId dummy_id; |
| get_next_instance_id(&dummy_id); |
| start_sender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024); |
| join_senders(); |
| EXPECT_TRUE(_sender_info[0].status.ok()); |
| EXPECT_GT(_sender_info[0].num_bytes_sent, 0); |
| } |
| |
| TEST_F(DataStreamTest, UnknownSenderLargeResult) { |
| // case 2: query result requires multiple buffers, send() returns ok |
| TUniqueId dummy_id; |
| get_next_instance_id(&dummy_id); |
| start_sender(); |
| join_senders(); |
| EXPECT_TRUE(_sender_info[0].status.ok()); |
| EXPECT_GT(_sender_info[0].num_bytes_sent, 0); |
| } |
| |
| TEST_F(DataStreamTest, Cancel) { |
| TUniqueId instance_id; |
| start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id); |
| _stream_mgr->cancel(instance_id); |
| start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id); |
| _stream_mgr->cancel(instance_id); |
| join_receivers(); |
| EXPECT_TRUE(_receiver_info[0].status.is_cancelled()); |
| } |
| |
| TEST_F(DataStreamTest, BasicTest) { |
| // TODO: also test that all client connections have been returned |
| TPartitionType::type stream_types[] = {TPartitionType::UNPARTITIONED, |
| TPartitionType::HASH_PARTITIONED}; |
| int sender_nums[] = {1, 3}; |
| int receiver_nums[] = {1, 3}; |
| int buffer_sizes[] = {1024, 1024 * 1024}; |
| bool merging[] = {false, true}; |
| |
| // test_stream(TPartitionType::HASH_PARTITIONED, 1, 3, 1024, true); |
| for (int i = 0; i < sizeof(stream_types) / sizeof(*stream_types); ++i) { |
| for (int j = 0; j < sizeof(sender_nums) / sizeof(int); ++j) { |
| for (int k = 0; k < sizeof(receiver_nums) / sizeof(int); ++k) { |
| for (int l = 0; l < sizeof(buffer_sizes) / sizeof(int); ++l) { |
| for (int m = 0; m < sizeof(merging) / sizeof(bool); ++m) { |
| LOG(ERROR) << "before test: stream_type=" << stream_types[i] |
| << " sender num=" << sender_nums[j] |
| << " receiver_num=" << receiver_nums[k] |
| << " buffer_size=" << buffer_sizes[l] |
| << " merging=" << (merging[m] ? "true" : "false"); |
| test_stream(stream_types[i], sender_nums[j], receiver_nums[k], |
| buffer_sizes[l], merging[m]); |
| LOG(ERROR) << "after test: stream_type=" << stream_types[i] |
| << " sender num=" << sender_nums[j] |
| << " receiver_num=" << receiver_nums[k] |
| << " buffer_size=" << buffer_sizes[l] |
| << " merging=" << (merging[m] ? "true" : "false"); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| // TODO: more tests: |
| // - test case for transmission error in last batch |
| // - receivers getting created concurrently |
| |
| } // namespace doris |
| |
| int main(int argc, char** argv) { |
| // std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; |
| // if (!doris::config::init(conffile.c_str(), false)) { |
| // fprintf(stderr, "error read config file. conffile path= %s\n", conffile.c_str()); |
| // return -1; |
| // } |
| doris::config::query_scratch_dirs = "/tmp"; |
| doris::config::max_free_io_buffers = 128; |
| doris::config::disable_mem_pools = false; |
| doris::config::min_buffer_size = 1024; |
| doris::config::read_size = 8388608; |
| doris::config::port = 2001; |
| doris::config::thrift_connect_timeout_seconds = 20; |
| |
| doris::init_glog("be-test"); |
| ::testing::InitGoogleTest(&argc, argv); |
| |
| doris::CpuInfo::init(); |
| doris::DiskInfo::init(); |
| doris::MemInfo::init(); |
| |
| return RUN_ALL_TESTS(); |
| } |