blob: 7b4e1120e6954eec4681f9ae97db1c1380975ab8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/tablet_sink.h"
#include <gtest/gtest.h>
#include "common/config.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/bufferpool/reservation_tracker.h"
#include "runtime/decimal_value.h"
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/thread_resource_mgr.h"
#include "runtime/tuple_row.h"
#include "service/brpc.h"
#include "util/brpc_stub_cache.h"
#include "util/cpu_info.h"
#include "util/debug/leakcheck_disabler.h"
namespace doris {
namespace stream_load {
Status k_add_batch_status;
class OlapTableSinkTest : public testing::Test {
public:
OlapTableSinkTest() {}
virtual ~OlapTableSinkTest() {}
void SetUp() override {
k_add_batch_status = Status::OK();
_env = ExecEnv::GetInstance();
_env->_thread_mgr = new ThreadResourceMgr();
_env->_master_info = new TMasterInfo();
_env->_load_stream_mgr = new LoadStreamMgr();
_env->_brpc_stub_cache = new BrpcStubCache();
_env->_buffer_reservation = new ReservationTracker();
config::tablet_writer_open_rpc_timeout_sec = 60;
}
void TearDown() override {
SAFE_DELETE(_env->_brpc_stub_cache);
SAFE_DELETE(_env->_load_stream_mgr);
SAFE_DELETE(_env->_master_info);
SAFE_DELETE(_env->_thread_mgr);
SAFE_DELETE(_env->_buffer_reservation);
if (_server) {
_server->Stop(100);
_server->Join();
SAFE_DELETE(_server);
}
}
private:
ExecEnv* _env = nullptr;
brpc::Server* _server = nullptr;
};
TDataSink get_data_sink(TDescriptorTable* desc_tbl) {
int64_t db_id = 1;
int64_t table_id = 2;
int64_t partition_id = 3;
int64_t index1_id = 4;
int64_t tablet1_id = 6;
int64_t tablet2_id = 7;
TDataSink data_sink;
data_sink.type = TDataSinkType::OLAP_TABLE_SINK;
data_sink.__isset.olap_table_sink = true;
TOlapTableSink& tsink = data_sink.olap_table_sink;
tsink.load_id.hi = 123;
tsink.load_id.lo = 456;
tsink.txn_id = 789;
tsink.db_id = 1;
tsink.table_id = 2;
tsink.tuple_id = 0;
tsink.num_replicas = 3;
tsink.db_name = "testDb";
tsink.table_name = "testTable";
// construct schema
TOlapTableSchemaParam& tschema = tsink.schema;
tschema.db_id = 1;
tschema.table_id = 2;
tschema.version = 0;
// descriptor
{
TDescriptorTableBuilder dtb;
{
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("c1")
.column_pos(1)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_BIGINT)
.column_name("c2")
.column_pos(2)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.string_type(10)
.column_name("c3")
.column_pos(3)
.build());
tuple_builder.build(&dtb);
}
{
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("c1")
.column_pos(1)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_BIGINT)
.column_name("c2")
.column_pos(2)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.string_type(20)
.column_name("c3")
.column_pos(3)
.build());
tuple_builder.build(&dtb);
}
*desc_tbl = dtb.desc_tbl();
tschema.slot_descs = desc_tbl->slotDescriptors;
tschema.tuple_desc = desc_tbl->tupleDescriptors[0];
}
// index
tschema.indexes.resize(1);
tschema.indexes[0].id = index1_id;
tschema.indexes[0].columns = {"c1", "c2", "c3"};
// tschema.indexes[1].id = 5;
// tschema.indexes[1].columns = {"c1", "c3"};
// partition
TOlapTablePartitionParam& tpartition = tsink.partition;
tpartition.db_id = db_id;
tpartition.table_id = table_id;
tpartition.version = table_id;
tpartition.__set_partition_column("c2");
tpartition.__set_distributed_columns({"c1", "c3"});
tpartition.partitions.resize(1);
tpartition.partitions[0].id = partition_id;
tpartition.partitions[0].num_buckets = 2;
tpartition.partitions[0].indexes.resize(1);
tpartition.partitions[0].indexes[0].index_id = index1_id;
tpartition.partitions[0].indexes[0].tablets = {tablet1_id, tablet2_id};
// location
TOlapTableLocationParam& location = tsink.location;
location.db_id = db_id;
location.table_id = table_id;
location.version = 0;
location.tablets.resize(2);
location.tablets[0].tablet_id = tablet1_id;
location.tablets[0].node_ids = {0, 1, 2};
location.tablets[1].tablet_id = tablet2_id;
location.tablets[1].node_ids = {0, 1, 2};
// location
TPaloNodesInfo& nodes_info = tsink.nodes_info;
nodes_info.nodes.resize(3);
nodes_info.nodes[0].id = 0;
nodes_info.nodes[0].host = "127.0.0.1";
nodes_info.nodes[0].async_internal_port = 4356;
nodes_info.nodes[1].id = 1;
nodes_info.nodes[1].host = "127.0.0.1";
nodes_info.nodes[1].async_internal_port = 4356;
nodes_info.nodes[2].id = 2;
nodes_info.nodes[2].host = "127.0.0.1";
nodes_info.nodes[2].async_internal_port = 4357;
return data_sink;
}
TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) {
int64_t db_id = 1;
int64_t table_id = 2;
int64_t partition_id = 3;
int64_t index1_id = 4;
int64_t tablet1_id = 6;
int64_t tablet2_id = 7;
TDataSink data_sink;
data_sink.type = TDataSinkType::OLAP_TABLE_SINK;
data_sink.__isset.olap_table_sink = true;
TOlapTableSink& tsink = data_sink.olap_table_sink;
tsink.load_id.hi = 123;
tsink.load_id.lo = 456;
tsink.txn_id = 789;
tsink.db_id = 1;
tsink.table_id = 2;
tsink.tuple_id = 0;
tsink.num_replicas = 3;
tsink.db_name = "testDb";
tsink.table_name = "testTable";
// construct schema
TOlapTableSchemaParam& tschema = tsink.schema;
tschema.db_id = 1;
tschema.table_id = 2;
tschema.version = 0;
// descriptor
{
TDescriptorTableBuilder dtb;
{
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("c1")
.column_pos(1)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.decimal_type(5, 2)
.column_name("c2")
.column_pos(2)
.build());
tuple_builder.build(&dtb);
}
*desc_tbl = dtb.desc_tbl();
tschema.slot_descs = desc_tbl->slotDescriptors;
tschema.tuple_desc = desc_tbl->tupleDescriptors[0];
}
// index
tschema.indexes.resize(1);
tschema.indexes[0].id = index1_id;
tschema.indexes[0].columns = {"c1", "c2"};
// tschema.indexes[1].id = 5;
// tschema.indexes[1].columns = {"c1", "c3"};
// partition
TOlapTablePartitionParam& tpartition = tsink.partition;
tpartition.db_id = db_id;
tpartition.table_id = table_id;
tpartition.version = table_id;
tpartition.__set_partition_column("c1");
tpartition.__set_distributed_columns({"c2"});
tpartition.partitions.resize(1);
tpartition.partitions[0].id = partition_id;
tpartition.partitions[0].num_buckets = 2;
tpartition.partitions[0].indexes.resize(1);
tpartition.partitions[0].indexes[0].index_id = index1_id;
tpartition.partitions[0].indexes[0].tablets = {tablet1_id, tablet2_id};
// location
TOlapTableLocationParam& location = tsink.location;
location.db_id = db_id;
location.table_id = table_id;
location.version = 0;
location.tablets.resize(2);
location.tablets[0].tablet_id = tablet1_id;
location.tablets[0].node_ids = {0, 1, 2};
location.tablets[1].tablet_id = tablet2_id;
location.tablets[1].node_ids = {0, 1, 2};
// location
TPaloNodesInfo& nodes_info = tsink.nodes_info;
nodes_info.nodes.resize(3);
nodes_info.nodes[0].id = 0;
nodes_info.nodes[0].host = "127.0.0.1";
nodes_info.nodes[0].async_internal_port = 4356;
nodes_info.nodes[1].id = 1;
nodes_info.nodes[1].host = "127.0.0.1";
nodes_info.nodes[1].async_internal_port = 4356;
nodes_info.nodes[2].id = 2;
nodes_info.nodes[2].host = "127.0.0.1";
nodes_info.nodes[2].async_internal_port = 4357;
return data_sink;
}
class TestInternalService : public PBackendService {
public:
TestInternalService() {}
virtual ~TestInternalService() {}
void transmit_data(::google::protobuf::RpcController* controller,
const ::doris::PTransmitDataParams* request,
::doris::PTransmitDataResult* response,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
}
void tablet_writer_open(google::protobuf::RpcController* controller,
const PTabletWriterOpenRequest* request,
PTabletWriterOpenResult* response,
google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
Status status;
status.to_protobuf(response->mutable_status());
}
void tablet_writer_add_batch(google::protobuf::RpcController* controller,
const PTabletWriterAddBatchRequest* request,
PTabletWriterAddBatchResult* response,
google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
{
std::lock_guard<std::mutex> l(_lock);
_row_counters += request->tablet_ids_size();
if (request->eos()) {
_eof_counters++;
}
k_add_batch_status.to_protobuf(response->mutable_status());
if (request->has_row_batch() && _row_desc != nullptr) {
auto tracker = std::make_shared<MemTracker>();
RowBatch batch(*_row_desc, request->row_batch(), tracker.get());
for (int i = 0; i < batch.num_rows(); ++i) {
LOG(INFO) << batch.get_row(i)->to_string(*_row_desc);
_output_set->emplace(batch.get_row(i)->to_string(*_row_desc));
}
}
}
}
void tablet_writer_cancel(google::protobuf::RpcController* controller,
const PTabletWriterCancelRequest* request,
PTabletWriterCancelResult* response,
google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
}
std::mutex _lock;
int64_t _eof_counters = 0;
int64_t _row_counters = 0;
RowDescriptor* _row_desc = nullptr;
std::set<std::string>* _output_set = nullptr;
};
TEST_F(OlapTableSinkTest, normal) {
// start brpc service first
_server = new brpc::Server();
auto service = new TestInternalService();
ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
brpc::ServerOptions options;
{
debug::ScopedLeakCheckDisabler disable_lsan;
_server->Start(4356, &options);
}
TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());
// state._query_mem_tracker.reset(new MemTracker());
// state._instance_mem_tracker.reset(new MemTracker(-1, "test", state._query_mem_tracker.get()));
ObjectPool obj_pool;
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_data_sink(&tdesc_tbl);
// crate desc_tabl
DescriptorTbl* desc_tbl = nullptr;
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
ASSERT_TRUE(st.ok());
state._desc_tbl = desc_tbl;
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
RowDescriptor row_desc(*desc_tbl, {0}, {false});
OlapTableSink sink(&obj_pool, row_desc, {}, &st);
ASSERT_TRUE(st.ok());
// init
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
// prepare
st = sink.prepare(&state);
ASSERT_TRUE(st.ok());
// open
st = sink.open(&state);
ASSERT_TRUE(st.ok());
// send
auto tracker = std::make_shared<MemTracker>();
RowBatch batch(row_desc, 1024, tracker.get());
// 12, 9, "abc"
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 12;
*reinterpret_cast<int64_t*>(tuple->get_slot(8)) = 9;
StringValue* str_val = reinterpret_cast<StringValue*>(tuple->get_slot(16));
str_val->ptr = (char*)batch.tuple_data_pool()->allocate(10);
str_val->len = 3;
memcpy(str_val->ptr, "abc", str_val->len);
batch.commit_last_row();
}
// 13, 25, "abcd"
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 13;
*reinterpret_cast<int64_t*>(tuple->get_slot(8)) = 25;
StringValue* str_val = reinterpret_cast<StringValue*>(tuple->get_slot(16));
str_val->ptr = (char*)batch.tuple_data_pool()->allocate(10);
str_val->len = 4;
memcpy(str_val->ptr, "abcd", str_val->len);
batch.commit_last_row();
}
// 14, 50, "abcde"
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 14;
*reinterpret_cast<int64_t*>(tuple->get_slot(8)) = 50;
StringValue* str_val = reinterpret_cast<StringValue*>(tuple->get_slot(16));
str_val->ptr = reinterpret_cast<char*>(batch.tuple_data_pool()->allocate(16));
str_val->len = 15;
memcpy(str_val->ptr, "abcde1234567890", str_val->len);
batch.commit_last_row();
}
st = sink.send(&state, &batch);
ASSERT_TRUE(st.ok());
// close
st = sink.close(&state, Status::OK());
ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string();
// each node has a eof
ASSERT_EQ(2, service->_eof_counters);
ASSERT_EQ(2 * 2, service->_row_counters);
// 2node * 2
ASSERT_EQ(1, state.num_rows_load_filtered());
}
TEST_F(OlapTableSinkTest, convert) {
// start brpc service first
_server = new brpc::Server();
auto service = new TestInternalService();
ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
brpc::ServerOptions options;
{
debug::ScopedLeakCheckDisabler disable_lsan;
_server->Start(4356, &options);
}
TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1024;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());
ObjectPool obj_pool;
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_data_sink(&tdesc_tbl);
// crate desc_tabl
DescriptorTbl* desc_tbl = nullptr;
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
ASSERT_TRUE(st.ok());
state._desc_tbl = desc_tbl;
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
RowDescriptor row_desc(*desc_tbl, {0}, {false});
// expr
std::vector<TExpr> exprs;
exprs.resize(3);
exprs[0].nodes.resize(1);
exprs[0].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[0].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType;
exprs[0].nodes[0].num_children = 0;
exprs[0].nodes[0].__isset.slot_ref = true;
exprs[0].nodes[0].slot_ref.slot_id = 0;
exprs[0].nodes[0].slot_ref.tuple_id = 1;
exprs[1].nodes.resize(1);
exprs[1].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[1].nodes[0].type = tdesc_tbl.slotDescriptors[4].slotType;
exprs[1].nodes[0].num_children = 0;
exprs[1].nodes[0].__isset.slot_ref = true;
exprs[1].nodes[0].slot_ref.slot_id = 1;
exprs[1].nodes[0].slot_ref.tuple_id = 1;
exprs[2].nodes.resize(1);
exprs[2].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[2].nodes[0].type = tdesc_tbl.slotDescriptors[5].slotType;
exprs[2].nodes[0].num_children = 0;
exprs[2].nodes[0].__isset.slot_ref = true;
exprs[2].nodes[0].slot_ref.slot_id = 2;
exprs[2].nodes[0].slot_ref.tuple_id = 1;
OlapTableSink sink(&obj_pool, row_desc, exprs, &st);
ASSERT_TRUE(st.ok());
// set output tuple_id
t_data_sink.olap_table_sink.tuple_id = 1;
// init
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
// prepare
st = sink.prepare(&state);
ASSERT_TRUE(st.ok());
// open
st = sink.open(&state);
ASSERT_TRUE(st.ok());
// send
auto tracker = std::make_shared<MemTracker>();
RowBatch batch(row_desc, 1024, tracker.get());
// 12, 9, "abc"
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 12;
*reinterpret_cast<int64_t*>(tuple->get_slot(8)) = 9;
StringValue* str_val = reinterpret_cast<StringValue*>(tuple->get_slot(16));
str_val->ptr = (char*)batch.tuple_data_pool()->allocate(10);
str_val->len = 3;
memcpy(str_val->ptr, "abc", str_val->len);
batch.commit_last_row();
}
// 13, 25, "abcd"
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 13;
*reinterpret_cast<int64_t*>(tuple->get_slot(8)) = 25;
StringValue* str_val = reinterpret_cast<StringValue*>(tuple->get_slot(16));
str_val->ptr = (char*)batch.tuple_data_pool()->allocate(10);
str_val->len = 4;
memcpy(str_val->ptr, "abcd", str_val->len);
batch.commit_last_row();
}
// 14, 50, "abcde"
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 14;
*reinterpret_cast<int64_t*>(tuple->get_slot(8)) = 50;
StringValue* str_val = reinterpret_cast<StringValue*>(tuple->get_slot(16));
str_val->ptr = reinterpret_cast<char*>(batch.tuple_data_pool()->allocate(10));
str_val->len = 5;
memcpy(str_val->ptr, "abcde", str_val->len);
batch.commit_last_row();
}
st = sink.send(&state, &batch);
ASSERT_TRUE(st.ok());
// close
st = sink.close(&state, Status::OK());
ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string();
// each node has a eof
ASSERT_EQ(2, service->_eof_counters);
ASSERT_EQ(2 * 3, service->_row_counters);
// 2node * 2
ASSERT_EQ(0, state.num_rows_load_filtered());
}
TEST_F(OlapTableSinkTest, init_fail1) {
TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());
ObjectPool obj_pool;
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_data_sink(&tdesc_tbl);
// crate desc_tabl
DescriptorTbl* desc_tbl = nullptr;
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
ASSERT_TRUE(st.ok());
state._desc_tbl = desc_tbl;
RowDescriptor row_desc(*desc_tbl, {0}, {false});
// expr
std::vector<TExpr> exprs;
exprs.resize(1);
exprs[0].nodes.resize(1);
exprs[0].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[0].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType;
exprs[0].nodes[0].num_children = 0;
exprs[0].nodes[0].__isset.slot_ref = true;
exprs[0].nodes[0].slot_ref.slot_id = 0;
exprs[0].nodes[0].slot_ref.tuple_id = 1;
{
OlapTableSink sink(&obj_pool, row_desc, exprs, &st);
ASSERT_TRUE(st.ok());
// set output tuple_id
t_data_sink.olap_table_sink.tuple_id = 5;
// init
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
st = sink.prepare(&state);
EXPECT_FALSE(st.ok());
sink.close(&state, st);
}
{
OlapTableSink sink(&obj_pool, row_desc, exprs, &st);
ASSERT_TRUE(st.ok());
// set output tuple_id
t_data_sink.olap_table_sink.tuple_id = 1;
// init
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
st = sink.prepare(&state);
EXPECT_FALSE(st.ok());
sink.close(&state, st);
}
}
TEST_F(OlapTableSinkTest, init_fail3) {
TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());
ObjectPool obj_pool;
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_data_sink(&tdesc_tbl);
// crate desc_tabl
DescriptorTbl* desc_tbl = nullptr;
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
ASSERT_TRUE(st.ok());
state._desc_tbl = desc_tbl;
RowDescriptor row_desc(*desc_tbl, {0}, {false});
// expr
std::vector<TExpr> exprs;
exprs.resize(3);
exprs[0].nodes.resize(1);
exprs[0].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[0].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType;
exprs[0].nodes[0].num_children = 0;
exprs[0].nodes[0].__isset.slot_ref = true;
exprs[0].nodes[0].slot_ref.slot_id = 0;
exprs[0].nodes[0].slot_ref.tuple_id = 1;
exprs[1].nodes.resize(1);
exprs[1].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[1].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType;
exprs[1].nodes[0].num_children = 0;
exprs[1].nodes[0].__isset.slot_ref = true;
exprs[1].nodes[0].slot_ref.slot_id = 1;
exprs[1].nodes[0].slot_ref.tuple_id = 1;
exprs[2].nodes.resize(1);
exprs[2].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[2].nodes[0].type = tdesc_tbl.slotDescriptors[5].slotType;
exprs[2].nodes[0].num_children = 0;
exprs[2].nodes[0].__isset.slot_ref = true;
exprs[2].nodes[0].slot_ref.slot_id = 2;
exprs[2].nodes[0].slot_ref.tuple_id = 1;
OlapTableSink sink(&obj_pool, row_desc, exprs, &st);
ASSERT_TRUE(st.ok());
// set output tuple_id
t_data_sink.olap_table_sink.tuple_id = 1;
// init
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
st = sink.prepare(&state);
EXPECT_FALSE(st.ok());
sink.close(&state, st);
}
TEST_F(OlapTableSinkTest, init_fail4) {
TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());
ObjectPool obj_pool;
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_data_sink(&tdesc_tbl);
// crate desc_tabl
DescriptorTbl* desc_tbl = nullptr;
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
ASSERT_TRUE(st.ok());
state._desc_tbl = desc_tbl;
RowDescriptor row_desc(*desc_tbl, {0}, {false});
// expr
std::vector<TExpr> exprs;
exprs.resize(3);
exprs[0].nodes.resize(1);
exprs[0].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[0].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType;
exprs[0].nodes[0].num_children = 0;
exprs[0].nodes[0].__isset.slot_ref = true;
exprs[0].nodes[0].slot_ref.slot_id = 0;
exprs[0].nodes[0].slot_ref.tuple_id = 1;
exprs[1].nodes.resize(1);
exprs[1].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[1].nodes[0].type = tdesc_tbl.slotDescriptors[4].slotType;
exprs[1].nodes[0].num_children = 0;
exprs[1].nodes[0].__isset.slot_ref = true;
exprs[1].nodes[0].slot_ref.slot_id = 1;
exprs[1].nodes[0].slot_ref.tuple_id = 1;
exprs[2].nodes.resize(1);
exprs[2].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[2].nodes[0].type = tdesc_tbl.slotDescriptors[5].slotType;
exprs[2].nodes[0].num_children = 0;
exprs[2].nodes[0].__isset.slot_ref = true;
exprs[2].nodes[0].slot_ref.slot_id = 2;
exprs[2].nodes[0].slot_ref.tuple_id = 1;
OlapTableSink sink(&obj_pool, row_desc, exprs, &st);
ASSERT_TRUE(st.ok());
// set output tuple_id
t_data_sink.olap_table_sink.tuple_id = 1;
// init
t_data_sink.olap_table_sink.partition.partitions[0].indexes[0].tablets = {101, 102};
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
st = sink.prepare(&state);
EXPECT_FALSE(st.ok());
sink.close(&state, st);
}
TEST_F(OlapTableSinkTest, add_batch_failed) {
// start brpc service first
_server = new brpc::Server();
auto service = new TestInternalService();
ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
brpc::ServerOptions options;
{
debug::ScopedLeakCheckDisabler disable_lsan;
_server->Start(4356, &options);
}
// ObjectPool create before RuntimeState, simulate actual situation better.
ObjectPool obj_pool;
TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_data_sink(&tdesc_tbl);
// crate desc_tabl
DescriptorTbl* desc_tbl = nullptr;
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
ASSERT_TRUE(st.ok());
state._desc_tbl = desc_tbl;
RowDescriptor row_desc(*desc_tbl, {0}, {false});
// expr
std::vector<TExpr> exprs;
exprs.resize(3);
exprs[0].nodes.resize(1);
exprs[0].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[0].nodes[0].type = tdesc_tbl.slotDescriptors[3].slotType;
exprs[0].nodes[0].num_children = 0;
exprs[0].nodes[0].__isset.slot_ref = true;
exprs[0].nodes[0].slot_ref.slot_id = 0;
exprs[0].nodes[0].slot_ref.tuple_id = 1;
exprs[1].nodes.resize(1);
exprs[1].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[1].nodes[0].type = tdesc_tbl.slotDescriptors[4].slotType;
exprs[1].nodes[0].num_children = 0;
exprs[1].nodes[0].__isset.slot_ref = true;
exprs[1].nodes[0].slot_ref.slot_id = 1;
exprs[1].nodes[0].slot_ref.tuple_id = 1;
exprs[2].nodes.resize(1);
exprs[2].nodes[0].node_type = TExprNodeType::SLOT_REF;
exprs[2].nodes[0].type = tdesc_tbl.slotDescriptors[5].slotType;
exprs[2].nodes[0].num_children = 0;
exprs[2].nodes[0].__isset.slot_ref = true;
exprs[2].nodes[0].slot_ref.slot_id = 2;
exprs[2].nodes[0].slot_ref.tuple_id = 1;
OlapTableSink sink(&obj_pool, row_desc, exprs, &st);
ASSERT_TRUE(st.ok());
// set output tuple_id
t_data_sink.olap_table_sink.tuple_id = 1;
// init
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
st = sink.prepare(&state);
ASSERT_TRUE(st.ok());
st = sink.open(&state);
ASSERT_TRUE(st.ok());
// send
auto tracker = std::make_shared<MemTracker>();
RowBatch batch(row_desc, 1024, tracker.get());
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
// 12, 9, "abc"
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 12;
*reinterpret_cast<int64_t*>(tuple->get_slot(8)) = 9;
StringValue* str_val = reinterpret_cast<StringValue*>(tuple->get_slot(16));
str_val->ptr = (char*)batch.tuple_data_pool()->allocate(10);
str_val->len = 3;
memcpy(str_val->ptr, "abc", str_val->len);
batch.commit_last_row();
}
// Channels will be cancelled internally, coz brpc returns k_add_batch_status.
k_add_batch_status = Status::InternalError("dummy failed");
st = sink.send(&state, &batch);
ASSERT_TRUE(st.ok());
// Send batch multiple times, can make _cur_batch or _pending_batches(in channels) not empty.
// To ensure the order of releasing resource is OK.
sink.send(&state, &batch);
sink.send(&state, &batch);
// close
st = sink.close(&state, Status::OK());
ASSERT_FALSE(st.ok());
}
TEST_F(OlapTableSinkTest, decimal) {
// start brpc service first
_server = new brpc::Server();
auto service = new TestInternalService();
ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
brpc::ServerOptions options;
{
debug::ScopedLeakCheckDisabler disable_lsan;
_server->Start(4356, &options);
}
TUniqueId fragment_id;
TQueryOptions query_options;
query_options.batch_size = 1;
RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
state.init_mem_trackers(TUniqueId());
ObjectPool obj_pool;
TDescriptorTable tdesc_tbl;
auto t_data_sink = get_decimal_sink(&tdesc_tbl);
// crate desc_tabl
DescriptorTbl* desc_tbl = nullptr;
auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
ASSERT_TRUE(st.ok());
state._desc_tbl = desc_tbl;
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
RowDescriptor row_desc(*desc_tbl, {0}, {false});
service->_row_desc = &row_desc;
std::set<std::string> output_set;
service->_output_set = &output_set;
OlapTableSink sink(&obj_pool, row_desc, {}, &st);
ASSERT_TRUE(st.ok());
// init
st = sink.init(t_data_sink);
ASSERT_TRUE(st.ok());
// prepare
st = sink.prepare(&state);
ASSERT_TRUE(st.ok());
// open
st = sink.open(&state);
ASSERT_TRUE(st.ok());
// send
auto tracker = std::make_shared<MemTracker>();
RowBatch batch(row_desc, 1024, tracker.get());
// 12, 12.3
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 12;
DecimalValue* dec_val = reinterpret_cast<DecimalValue*>(tuple->get_slot(16));
*dec_val = DecimalValue("12.3");
batch.commit_last_row();
}
// 13, 123.123456789
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 13;
DecimalValue* dec_val = reinterpret_cast<DecimalValue*>(tuple->get_slot(16));
*dec_val = DecimalValue("123.123456789");
batch.commit_last_row();
}
// 14, 123456789123.1234
{
Tuple* tuple = (Tuple*)batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
batch.get_row(batch.add_row())->set_tuple(0, tuple);
memset(tuple, 0, tuple_desc->byte_size());
*reinterpret_cast<int*>(tuple->get_slot(4)) = 14;
DecimalValue* dec_val = reinterpret_cast<DecimalValue*>(tuple->get_slot(16));
*dec_val = DecimalValue("123456789123.1234");
batch.commit_last_row();
}
st = sink.send(&state, &batch);
ASSERT_TRUE(st.ok());
// close
st = sink.close(&state, Status::OK());
ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string();
ASSERT_EQ(2, output_set.size());
ASSERT_TRUE(output_set.count("[(12 12.3)]") > 0);
ASSERT_TRUE(output_set.count("[(13 123.12)]") > 0);
// ASSERT_TRUE(output_set.count("[(14 999.99)]") > 0);
}
} // namespace stream_load
} // namespace doris
int main(int argc, char* argv[]) {
doris::CpuInfo::init();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}