| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #include <brpc/closure_guard.h> |
| #include <brpc/server.h> |
| #include <gen_cpp/DataSinks_types.h> |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/Exprs_types.h> |
| #include <gtest/gtest-message.h> |
| #include <gtest/gtest-test-part.h> |
| |
| #include <filesystem> |
| #include <string> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/object_pool.h" |
| #include "gen_cpp/HeartbeatService_types.h" |
| #include "gen_cpp/internal_service.pb.h" |
| #include "gtest/gtest_pred_impl.h" |
| #include "io/fs/local_file_system.h" |
| #include "olap/olap_define.h" |
| #include "olap/wal/wal_manager.h" |
| #include "runtime/decimalv2_value.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/descriptor_helper.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/runtime_state.h" |
| #include "util/brpc_client_cache.h" |
| #include "util/debug/leakcheck_disabler.h" |
| #include "util/proto_util.h" |
| #include "util/threadpool.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/sink/volap_table_sink.h" |
| |
| namespace google { |
| namespace protobuf { |
| class RpcController; |
| } // namespace protobuf |
| } // namespace google |
| |
| namespace doris { |
| class PFunctionService_Stub; |
| |
| namespace vectorized { |
| |
| Status k_add_batch_status; |
| |
| TDataSink get_data_sink(TDescriptorTable* desc_tbl) { |
| int64_t db_id = 1; |
| int64_t table_id = 2; |
| int64_t partition_id = 3; |
| int64_t index1_id = 4; |
| int64_t tablet1_id = 6; |
| int64_t tablet2_id = 7; |
| |
| TDataSink data_sink; |
| data_sink.type = TDataSinkType::OLAP_TABLE_SINK; |
| data_sink.__isset.olap_table_sink = true; |
| |
| TOlapTableSink& tsink = data_sink.olap_table_sink; |
| tsink.load_id.hi = 123; |
| tsink.load_id.lo = 456; |
| tsink.txn_id = 789; |
| tsink.db_id = 1; |
| tsink.table_id = 2; |
| tsink.tuple_id = 0; |
| tsink.num_replicas = 3; |
| tsink.db_name = "testDb"; |
| tsink.table_name = "testTable"; |
| |
| // construct schema |
| TOlapTableSchemaParam& tschema = tsink.schema; |
| tschema.db_id = 1; |
| tschema.table_id = 2; |
| tschema.version = 0; |
| |
| // descriptor |
| { |
| TDescriptorTableBuilder dtb; |
| { |
| TTupleDescriptorBuilder tuple_builder; |
| |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_INT) |
| .column_name("c1") |
| .column_pos(1) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_BIGINT) |
| .column_name("c2") |
| .column_pos(2) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .string_type(10) |
| .column_name("c3") |
| .column_pos(3) |
| .build()); |
| |
| tuple_builder.build(&dtb); |
| } |
| { |
| TTupleDescriptorBuilder tuple_builder; |
| |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_INT) |
| .column_name("c1") |
| .column_pos(1) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_BIGINT) |
| .column_name("c2") |
| .column_pos(2) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .string_type(20) |
| .column_name("c3") |
| .column_pos(3) |
| .build()); |
| |
| tuple_builder.build(&dtb); |
| } |
| |
| *desc_tbl = dtb.desc_tbl(); |
| tschema.slot_descs = desc_tbl->slotDescriptors; |
| tschema.tuple_desc = desc_tbl->tupleDescriptors[0]; |
| } |
| // index |
| tschema.indexes.resize(1); |
| tschema.indexes[0].id = index1_id; |
| tschema.indexes[0].columns = {"c1", "c2", "c3"}; |
| // tschema.indexes[1].id = 5; |
| // tschema.indexes[1].columns = {"c1", "c3"}; |
| // partition |
| TOlapTablePartitionParam& tpartition = tsink.partition; |
| tpartition.db_id = db_id; |
| tpartition.table_id = table_id; |
| tpartition.version = table_id; |
| tpartition.__set_partition_column("c2"); |
| tpartition.__set_distributed_columns({"c1", "c3"}); |
| tpartition.partitions.resize(1); |
| tpartition.partitions[0].id = partition_id; |
| tpartition.partitions[0].num_buckets = 2; |
| tpartition.partitions[0].indexes.resize(1); |
| tpartition.partitions[0].indexes[0].index_id = index1_id; |
| tpartition.partitions[0].indexes[0].tablets = {tablet1_id, tablet2_id}; |
| // location |
| TOlapTableLocationParam& location = tsink.location; |
| location.db_id = db_id; |
| location.table_id = table_id; |
| location.version = 0; |
| location.tablets.resize(2); |
| location.tablets[0].tablet_id = tablet1_id; |
| location.tablets[0].node_ids = {0, 1, 2}; |
| location.tablets[1].tablet_id = tablet2_id; |
| location.tablets[1].node_ids = {0, 1, 2}; |
| // location |
| TPaloNodesInfo& nodes_info = tsink.nodes_info; |
| nodes_info.nodes.resize(3); |
| nodes_info.nodes[0].id = 0; |
| nodes_info.nodes[0].host = "127.0.0.1"; |
| nodes_info.nodes[0].async_internal_port = 4356; |
| nodes_info.nodes[1].id = 1; |
| nodes_info.nodes[1].host = "127.0.0.1"; |
| nodes_info.nodes[1].async_internal_port = 4356; |
| nodes_info.nodes[2].id = 2; |
| nodes_info.nodes[2].host = "127.0.0.1"; |
| nodes_info.nodes[2].async_internal_port = 4357; |
| |
| return data_sink; |
| } |
| |
| TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) { |
| int64_t db_id = 1; |
| int64_t table_id = 2; |
| int64_t partition_id = 3; |
| int64_t index1_id = 4; |
| int64_t tablet1_id = 6; |
| int64_t tablet2_id = 7; |
| |
| TDataSink data_sink; |
| data_sink.type = TDataSinkType::OLAP_TABLE_SINK; |
| data_sink.__isset.olap_table_sink = true; |
| |
| TOlapTableSink& tsink = data_sink.olap_table_sink; |
| tsink.load_id.hi = 123; |
| tsink.load_id.lo = 456; |
| tsink.txn_id = 789; |
| tsink.db_id = 1; |
| tsink.table_id = 2; |
| tsink.tuple_id = 0; |
| tsink.num_replicas = 3; |
| tsink.db_name = "testDb"; |
| tsink.table_name = "testTable"; |
| |
| // construct schema |
| TOlapTableSchemaParam& tschema = tsink.schema; |
| tschema.db_id = 1; |
| tschema.table_id = 2; |
| tschema.version = 0; |
| |
| // descriptor |
| { |
| TDescriptorTableBuilder dtb; |
| { |
| TTupleDescriptorBuilder tuple_builder; |
| |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_INT) |
| .column_name("c1") |
| .column_pos(1) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .decimal_type(5, 2) |
| .column_name("c2") |
| .column_pos(2) |
| .build()); |
| |
| tuple_builder.build(&dtb); |
| } |
| |
| *desc_tbl = dtb.desc_tbl(); |
| tschema.slot_descs = desc_tbl->slotDescriptors; |
| tschema.tuple_desc = desc_tbl->tupleDescriptors[0]; |
| } |
| // index |
| tschema.indexes.resize(1); |
| tschema.indexes[0].id = index1_id; |
| tschema.indexes[0].columns = {"c1", "c2"}; |
| // tschema.indexes[1].id = 5; |
| // tschema.indexes[1].columns = {"c1", "c3"}; |
| // partition |
| TOlapTablePartitionParam& tpartition = tsink.partition; |
| tpartition.db_id = db_id; |
| tpartition.table_id = table_id; |
| tpartition.version = table_id; |
| tpartition.__set_partition_column("c1"); |
| tpartition.__set_distributed_columns({"c2"}); |
| tpartition.partitions.resize(1); |
| tpartition.partitions[0].id = partition_id; |
| tpartition.partitions[0].num_buckets = 2; |
| tpartition.partitions[0].indexes.resize(1); |
| tpartition.partitions[0].indexes[0].index_id = index1_id; |
| tpartition.partitions[0].indexes[0].tablets = {tablet1_id, tablet2_id}; |
| // location |
| TOlapTableLocationParam& location = tsink.location; |
| location.db_id = db_id; |
| location.table_id = table_id; |
| location.version = 0; |
| location.tablets.resize(2); |
| location.tablets[0].tablet_id = tablet1_id; |
| location.tablets[0].node_ids = {0, 1, 2}; |
| location.tablets[1].tablet_id = tablet2_id; |
| location.tablets[1].node_ids = {0, 1, 2}; |
| // location |
| TPaloNodesInfo& nodes_info = tsink.nodes_info; |
| nodes_info.nodes.resize(3); |
| nodes_info.nodes[0].id = 0; |
| nodes_info.nodes[0].host = "127.0.0.1"; |
| nodes_info.nodes[0].async_internal_port = 4356; |
| nodes_info.nodes[1].id = 1; |
| nodes_info.nodes[1].host = "127.0.0.1"; |
| nodes_info.nodes[1].async_internal_port = 4356; |
| nodes_info.nodes[2].id = 2; |
| nodes_info.nodes[2].host = "127.0.0.1"; |
| nodes_info.nodes[2].async_internal_port = 4357; |
| |
| return data_sink; |
| } |
| |
| class VTestInternalService : public PBackendService { |
| public: |
| VTestInternalService() {} |
| virtual ~VTestInternalService() {} |
| |
| void transmit_data(::google::protobuf::RpcController* controller, |
| const ::doris::PTransmitDataParams* request, |
| ::doris::PTransmitDataResult* response, |
| ::google::protobuf::Closure* done) override { |
| brpc::ClosureGuard done_guard(done); |
| } |
| |
| void tablet_writer_open(google::protobuf::RpcController* controller, |
| const PTabletWriterOpenRequest* request, |
| PTabletWriterOpenResult* response, |
| google::protobuf::Closure* done) override { |
| brpc::ClosureGuard done_guard(done); |
| Status status; |
| status.to_protobuf(response->mutable_status()); |
| } |
| |
| void tablet_writer_add_block(google::protobuf::RpcController* controller, |
| const PTabletWriterAddBlockRequest* request, |
| PTabletWriterAddBlockResult* response, |
| google::protobuf::Closure* done) override { |
| brpc::ClosureGuard done_guard(done); |
| { |
| std::lock_guard<std::mutex> l(_lock); |
| _row_counters += request->tablet_ids_size(); |
| if (request->eos()) { |
| _eof_counters++; |
| } |
| k_add_batch_status.to_protobuf(response->mutable_status()); |
| |
| if (request->has_block() && _row_desc != nullptr) { |
| vectorized::Block block; |
| static_cast<void>(block.deserialize(request->block())); |
| |
| for (size_t row_num = 0; row_num < block.rows(); ++row_num) { |
| std::stringstream out; |
| out << "("; |
| for (size_t i = 0; i < block.columns(); ++i) { |
| if (block.get_by_position(i).column) { |
| out << block.get_by_position(i).to_string(row_num); |
| } |
| if (i != block.columns() - 1) { |
| out << ", "; |
| } |
| } |
| out << ")"; |
| _output_set->emplace(out.str()); |
| } |
| } |
| } |
| } |
| void tablet_writer_cancel(google::protobuf::RpcController* controller, |
| const PTabletWriterCancelRequest* request, |
| PTabletWriterCancelResult* response, |
| google::protobuf::Closure* done) override { |
| brpc::ClosureGuard done_guard(done); |
| } |
| |
| std::mutex _lock; |
| int64_t _eof_counters = 0; |
| int64_t _row_counters = 0; |
| RowDescriptor* _row_desc = nullptr; |
| std::set<std::string>* _output_set = nullptr; |
| }; |
| |
| class VOlapTableSinkTest : public testing::Test { |
| public: |
| VOlapTableSinkTest() {} |
| virtual ~VOlapTableSinkTest() {} |
| void SetUp() override { |
| k_add_batch_status = Status::OK(); |
| _env = ExecEnv::GetInstance(); |
| _env->_master_info = new TMasterInfo(); |
| _env->_master_info->network_address.hostname = "host name"; |
| _env->_master_info->network_address.port = 1234; |
| _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); |
| _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); |
| _env->_wal_manager = WalManager::create_shared(_env, wal_dir); |
| static_cast<void>(_env->wal_mgr()->init()); |
| static_cast<void>(ThreadPoolBuilder("SendBatchThreadPool") |
| .set_min_threads(1) |
| .set_max_threads(5) |
| .set_max_queue_size(100) |
| .build(&_env->_send_batch_thread_pool)); |
| config::tablet_writer_open_rpc_timeout_sec = 60; |
| config::max_send_batch_parallelism_per_job = 1; |
| } |
| |
| void TearDown() override { |
| static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir)); |
| SAFE_STOP(_env->_wal_manager); |
| SAFE_DELETE(_env->_internal_client_cache); |
| SAFE_DELETE(_env->_function_client_cache); |
| SAFE_DELETE(_env->_master_info); |
| if (_server) { |
| _server->Stop(100); |
| _server->Join(); |
| SAFE_DELETE(_server); |
| } |
| } |
| |
| void test_normal(int be_exec_version) { |
| // start brpc service first |
| _server = new brpc::Server(); |
| auto service = new VTestInternalService(); |
| ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); |
| brpc::ServerOptions options; |
| { |
| debug::ScopedLeakCheckDisabler disable_lsan; |
| _server->Start(4356, &options); |
| } |
| |
| TUniqueId fragment_id; |
| TQueryOptions query_options; |
| query_options.batch_size = 1; |
| query_options.be_exec_version = be_exec_version; |
| RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); |
| std::shared_ptr<TaskExecutionContext> task_ctx_lock = |
| std::make_shared<TaskExecutionContext>(); |
| state.set_task_execution_context(task_ctx_lock); |
| state.init_mem_trackers(TUniqueId()); |
| |
| ObjectPool obj_pool; |
| TDescriptorTable tdesc_tbl; |
| auto t_data_sink = get_data_sink(&tdesc_tbl); |
| |
| // crate desc_tabl |
| DescriptorTbl* desc_tbl = nullptr; |
| auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); |
| ASSERT_TRUE(st.ok()); |
| state._desc_tbl = desc_tbl; |
| |
| TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); |
| LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); |
| |
| RowDescriptor row_desc(*desc_tbl, {0}, {false}); |
| service->_row_desc = &row_desc; |
| std::set<std::string> output_set; |
| service->_output_set = &output_set; |
| |
| std::vector<TExpr> exprs; |
| VOlapTableSink sink(&obj_pool, row_desc, exprs); |
| ASSERT_TRUE(st.ok()); |
| |
| // init |
| st = sink.init(t_data_sink); |
| ASSERT_TRUE(st.ok()); |
| // prepare |
| st = sink.prepare(&state); |
| ASSERT_TRUE(st.ok()); |
| // open |
| st = sink.open(&state); |
| ASSERT_TRUE(st.ok()); |
| |
| int slot_count = tuple_desc->slots().size(); |
| std::vector<vectorized::MutableColumnPtr> columns(slot_count); |
| for (int i = 0; i < slot_count; i++) { |
| columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); |
| } |
| |
| int col_idx = 0; |
| auto* column_ptr = columns[col_idx++].get(); |
| auto column_vector_int = column_ptr; |
| int int_val = 12; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| int_val = 13; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| int_val = 14; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| |
| column_ptr = columns[col_idx++].get(); |
| auto column_vector_bigint = column_ptr; |
| int64_t int64_val = 9; |
| column_vector_bigint->insert_data((const char*)&int64_val, 0); |
| int64_val = 25; |
| column_vector_bigint->insert_data((const char*)&int64_val, 0); |
| int64_val = 50; |
| column_vector_bigint->insert_data((const char*)&int64_val, 0); |
| |
| column_ptr = columns[col_idx++].get(); |
| auto column_vector_str = column_ptr; |
| column_vector_str->insert_data("abc", 3); |
| column_vector_str->insert_data("abcd", 4); |
| column_vector_str->insert_data("abcde1234567890", 15); |
| |
| vectorized::Block block; |
| col_idx = 0; |
| for (const auto slot_desc : tuple_desc->slots()) { |
| block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), |
| slot_desc->get_data_type_ptr(), |
| slot_desc->col_name())); |
| } |
| |
| // send |
| st = sink.send(&state, &block); |
| ASSERT_TRUE(st.ok()); |
| // close |
| st = sink.close(&state, Status::OK()); |
| ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ") |
| << st.to_string(); |
| |
| // each node has a eof |
| ASSERT_EQ(2, service->_eof_counters); |
| ASSERT_EQ(2 * 2, service->_row_counters); |
| |
| // 2node * 2 |
| ASSERT_EQ(1, state.num_rows_load_filtered()); |
| } |
| |
| private: |
| ExecEnv* _env = nullptr; |
| brpc::Server* _server = nullptr; |
| std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; |
| }; |
| |
| TEST_F(VOlapTableSinkTest, normal) { |
| test_normal(1); |
| } |
| |
| TEST_F(VOlapTableSinkTest, fallback) { |
| test_normal(0); |
| } |
| |
| TEST_F(VOlapTableSinkTest, convert) { |
| // start brpc service first |
| _server = new brpc::Server(); |
| auto service = new VTestInternalService(); |
| ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); |
| brpc::ServerOptions options; |
| { |
| debug::ScopedLeakCheckDisabler disable_lsan; |
| _server->Start(4356, &options); |
| } |
| |
| TUniqueId fragment_id; |
| TQueryOptions query_options; |
| query_options.batch_size = 1024; |
| query_options.be_exec_version = 1; |
| RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); |
| std::shared_ptr<TaskExecutionContext> task_ctx_lock = std::make_shared<TaskExecutionContext>(); |
| state.set_task_execution_context(task_ctx_lock); |
| state.init_mem_trackers(TUniqueId()); |
| |
| ObjectPool obj_pool; |
| TDescriptorTable tdesc_tbl; |
| auto t_data_sink = get_data_sink(&tdesc_tbl); |
| |
| // crate desc_tabl |
| DescriptorTbl* desc_tbl = nullptr; |
| auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); |
| ASSERT_TRUE(st.ok()); |
| state._desc_tbl = desc_tbl; |
| |
| TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); |
| |
| RowDescriptor row_desc(*desc_tbl, {0}, {false}); |
| |
| // expr |
| std::vector<TExpr> exprs; |
| exprs.resize(3); |
| exprs[0].nodes.resize(1); |
| exprs[0].nodes[0].node_type = TExprNodeType::SLOT_REF; |
| exprs[0].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType; |
| exprs[0].nodes[0].num_children = 0; |
| exprs[0].nodes[0].__isset.slot_ref = true; |
| exprs[0].nodes[0].slot_ref.slot_id = 0; |
| exprs[0].nodes[0].slot_ref.tuple_id = 1; |
| |
| exprs[1].nodes.resize(1); |
| exprs[1].nodes[0].node_type = TExprNodeType::SLOT_REF; |
| exprs[1].nodes[0].type = tdesc_tbl.slotDescriptors[4].slotType; |
| exprs[1].nodes[0].num_children = 0; |
| exprs[1].nodes[0].__isset.slot_ref = true; |
| exprs[1].nodes[0].slot_ref.slot_id = 1; |
| exprs[1].nodes[0].slot_ref.tuple_id = 1; |
| |
| exprs[2].nodes.resize(1); |
| exprs[2].nodes[0].node_type = TExprNodeType::SLOT_REF; |
| exprs[2].nodes[0].type = tdesc_tbl.slotDescriptors[5].slotType; |
| exprs[2].nodes[0].num_children = 0; |
| exprs[2].nodes[0].__isset.slot_ref = true; |
| exprs[2].nodes[0].slot_ref.slot_id = 2; |
| exprs[2].nodes[0].slot_ref.tuple_id = 1; |
| |
| VOlapTableSink sink(&obj_pool, row_desc, exprs); |
| ASSERT_TRUE(st.ok()); |
| |
| // set output tuple_id |
| t_data_sink.olap_table_sink.tuple_id = 1; |
| // init |
| st = sink.init(t_data_sink); |
| ASSERT_TRUE(st.ok()); |
| // prepare |
| st = sink.prepare(&state); |
| ASSERT_TRUE(st.ok()); |
| // open |
| st = sink.open(&state); |
| ASSERT_TRUE(st.ok()); |
| // send |
| int slot_count = tuple_desc->slots().size(); |
| std::vector<vectorized::MutableColumnPtr> columns(slot_count); |
| for (int i = 0; i < slot_count; i++) { |
| columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); |
| } |
| |
| int col_idx = 0; |
| auto* column_ptr = columns[col_idx++].get(); |
| auto column_vector_int = column_ptr; |
| int int_val = 12; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| int_val = 13; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| int_val = 14; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| |
| column_ptr = columns[col_idx++].get(); |
| auto column_vector_bigint = column_ptr; |
| int64_t int64_val = 9; |
| column_vector_bigint->insert_data((const char*)&int64_val, 0); |
| int64_val = 25; |
| column_vector_bigint->insert_data((const char*)&int64_val, 0); |
| int64_val = 50; |
| column_vector_bigint->insert_data((const char*)&int64_val, 0); |
| |
| column_ptr = columns[col_idx++].get(); |
| auto column_vector_str = column_ptr; |
| column_vector_str->insert_data("abc", 3); |
| column_vector_str->insert_data("abcd", 4); |
| column_vector_str->insert_data("abcde", 5); |
| |
| vectorized::Block block; |
| col_idx = 0; |
| for (const auto slot_desc : tuple_desc->slots()) { |
| block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), |
| slot_desc->get_data_type_ptr(), |
| slot_desc->col_name())); |
| } |
| st = sink.send(&state, &block); |
| ASSERT_TRUE(st.ok()); |
| // close |
| st = sink.close(&state, Status::OK()); |
| ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ") |
| << st.to_string(); |
| |
| // each node has a eof |
| ASSERT_EQ(2, service->_eof_counters); |
| ASSERT_EQ(2 * 3, service->_row_counters); |
| |
| // 2node * 2 |
| ASSERT_EQ(0, state.num_rows_load_filtered()); |
| } |
| |
| TEST_F(VOlapTableSinkTest, add_block_failed) { |
| // start brpc service first |
| _server = new brpc::Server(); |
| auto service = new VTestInternalService(); |
| ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); |
| brpc::ServerOptions options; |
| { |
| debug::ScopedLeakCheckDisabler disable_lsan; |
| _server->Start(4356, &options); |
| } |
| |
| // ObjectPool create before RuntimeState, simulate actual situation better. |
| ObjectPool obj_pool; |
| |
| TUniqueId fragment_id; |
| TQueryOptions query_options; |
| query_options.batch_size = 1; |
| query_options.be_exec_version = 1; |
| RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); |
| std::shared_ptr<TaskExecutionContext> task_ctx_lock = std::make_shared<TaskExecutionContext>(); |
| state.set_task_execution_context(task_ctx_lock); |
| state.init_mem_trackers(TUniqueId()); |
| |
| TDescriptorTable tdesc_tbl; |
| auto t_data_sink = get_data_sink(&tdesc_tbl); |
| |
| // crate desc_tabl |
| DescriptorTbl* desc_tbl = nullptr; |
| auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); |
| ASSERT_TRUE(st.ok()); |
| state._desc_tbl = desc_tbl; |
| |
| RowDescriptor row_desc(*desc_tbl, {0}, {false}); |
| |
| // expr |
| std::vector<TExpr> exprs; |
| exprs.resize(3); |
| exprs[0].nodes.resize(1); |
| exprs[0].nodes[0].node_type = TExprNodeType::SLOT_REF; |
| exprs[0].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType; |
| exprs[0].nodes[0].num_children = 0; |
| exprs[0].nodes[0].__isset.slot_ref = true; |
| exprs[0].nodes[0].slot_ref.slot_id = 0; |
| exprs[0].nodes[0].slot_ref.tuple_id = 1; |
| |
| exprs[1].nodes.resize(1); |
| exprs[1].nodes[0].node_type = TExprNodeType::SLOT_REF; |
| exprs[1].nodes[0].type = tdesc_tbl.slotDescriptors[4].slotType; |
| exprs[1].nodes[0].num_children = 0; |
| exprs[1].nodes[0].__isset.slot_ref = true; |
| exprs[1].nodes[0].slot_ref.slot_id = 1; |
| exprs[1].nodes[0].slot_ref.tuple_id = 1; |
| |
| exprs[2].nodes.resize(1); |
| exprs[2].nodes[0].node_type = TExprNodeType::SLOT_REF; |
| exprs[2].nodes[0].type = tdesc_tbl.slotDescriptors[5].slotType; |
| exprs[2].nodes[0].num_children = 0; |
| exprs[2].nodes[0].__isset.slot_ref = true; |
| exprs[2].nodes[0].slot_ref.slot_id = 2; |
| exprs[2].nodes[0].slot_ref.tuple_id = 1; |
| |
| VOlapTableSink sink(&obj_pool, row_desc, exprs); |
| ASSERT_TRUE(st.ok()); |
| |
| // set output tuple_id |
| t_data_sink.olap_table_sink.tuple_id = 1; |
| // init |
| st = sink.init(t_data_sink); |
| ASSERT_TRUE(st.ok()); |
| st = sink.prepare(&state); |
| ASSERT_TRUE(st.ok()); |
| st = sink.open(&state); |
| ASSERT_TRUE(st.ok()); |
| // send |
| TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); |
| |
| int slot_count = tuple_desc->slots().size(); |
| std::vector<vectorized::MutableColumnPtr> columns(slot_count); |
| for (int i = 0; i < slot_count; i++) { |
| columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); |
| } |
| |
| int col_idx = 0; |
| auto* column_ptr = columns[col_idx++].get(); |
| auto column_vector_int = column_ptr; |
| int int_val = 12; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| |
| column_ptr = columns[col_idx++].get(); |
| auto column_vector_bigint = column_ptr; |
| int64_t int64_val = 9; |
| column_vector_bigint->insert_data((const char*)&int64_val, 0); |
| |
| column_ptr = columns[col_idx++].get(); |
| auto column_vector_str = column_ptr; |
| column_vector_str->insert_data("abc", 3); |
| |
| vectorized::Block block; |
| col_idx = 0; |
| for (const auto slot_desc : tuple_desc->slots()) { |
| block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), |
| slot_desc->get_data_type_ptr(), |
| slot_desc->col_name())); |
| } |
| // Channels will be cancelled internally, coz brpc returns k_add_batch_status. |
| k_add_batch_status = Status::InternalError("dummy failed"); |
| st = sink.send(&state, &block); |
| ASSERT_TRUE(st.ok()); |
| |
| // Send batch multiple times, can make _cur_batch or _pending_batches(in channels) not empty. |
| // To ensure the order of releasing resource is OK. |
| static_cast<void>(sink.send(&state, &block)); |
| static_cast<void>(sink.send(&state, &block)); |
| |
| // close |
| st = sink.close(&state, Status::OK()); |
| ASSERT_FALSE(st.ok()); |
| } |
| |
| TEST_F(VOlapTableSinkTest, decimal) { |
| // start brpc service first |
| _server = new brpc::Server(); |
| auto service = new VTestInternalService(); |
| ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); |
| brpc::ServerOptions options; |
| { |
| debug::ScopedLeakCheckDisabler disable_lsan; |
| _server->Start(4356, &options); |
| } |
| |
| TUniqueId fragment_id; |
| TQueryOptions query_options; |
| query_options.batch_size = 1; |
| query_options.be_exec_version = 1; |
| RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env); |
| std::shared_ptr<TaskExecutionContext> task_ctx_lock = std::make_shared<TaskExecutionContext>(); |
| state.set_task_execution_context(task_ctx_lock); |
| state.init_mem_trackers(TUniqueId()); |
| |
| ObjectPool obj_pool; |
| TDescriptorTable tdesc_tbl; |
| auto t_data_sink = get_decimal_sink(&tdesc_tbl); |
| |
| // crate desc_tabl |
| DescriptorTbl* desc_tbl = nullptr; |
| auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); |
| ASSERT_TRUE(st.ok()); |
| state._desc_tbl = desc_tbl; |
| |
| TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); |
| LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); |
| |
| RowDescriptor row_desc(*desc_tbl, {0}, {false}); |
| service->_row_desc = &row_desc; |
| std::set<std::string> output_set; |
| service->_output_set = &output_set; |
| |
| std::vector<TExpr> exprs; |
| VOlapTableSink sink(&obj_pool, row_desc, exprs); |
| ASSERT_TRUE(st.ok()); |
| |
| // init |
| st = sink.init(t_data_sink); |
| ASSERT_TRUE(st.ok()); |
| // prepare |
| st = sink.prepare(&state); |
| ASSERT_TRUE(st.ok()); |
| // open |
| st = sink.open(&state); |
| ASSERT_TRUE(st.ok()); |
| // send |
| int slot_count = tuple_desc->slots().size(); |
| std::vector<vectorized::MutableColumnPtr> columns(slot_count); |
| for (int i = 0; i < slot_count; i++) { |
| columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); |
| } |
| |
| int col_idx = 0; |
| auto* column_ptr = columns[col_idx++].get(); |
| auto column_vector_int = column_ptr; |
| int int_val = 12; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| int_val = 13; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| int_val = 14; |
| column_vector_int->insert_data((const char*)&int_val, 0); |
| |
| column_ptr = columns[col_idx++].get(); |
| auto column_vector_dec = column_ptr; |
| DecimalV2Value dec_val(std::string("12.3")); |
| column_vector_dec->insert_data((const char*)&dec_val, 0); |
| dec_val = std::string("123.123456789"); |
| column_vector_dec->insert_data((const char*)&dec_val, 0); |
| dec_val = std::string("123456789123.1234"); |
| column_vector_dec->insert_data((const char*)&dec_val, 0); |
| |
| vectorized::Block block; |
| col_idx = 0; |
| for (const auto slot_desc : tuple_desc->slots()) { |
| block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]), |
| slot_desc->get_data_type_ptr(), |
| slot_desc->col_name())); |
| } |
| st = sink.send(&state, &block); |
| ASSERT_TRUE(st.ok()); |
| // close |
| st = sink.close(&state, Status::OK()); |
| ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close failed. ") |
| << st.to_string(); |
| |
| ASSERT_EQ(2, output_set.size()); |
| ASSERT_TRUE(output_set.count("(12, 12.300000000)") > 0); |
| ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0); |
| } |
| |
| } // namespace vectorized |
| } // namespace doris |