blob: 9c8e879877991e0fb4251f04928358d3dba5fc4f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/sink/vrow_distribution.h"
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/Partitions_types.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>
#include "common/config.h"
#include "core/data_type/data_type_number.h"
#include "exec/operator/operator_helper.h"
#include "exec/sink/sink_test_utils.h"
#include "exec/sink/vtablet_block_convertor.h"
#include "exec/sink/vtablet_finder.h"
#include "testutil/column_helper.h"
#include "testutil/mock/mock_slot_ref.h"
#include "util/debug_points.h"
namespace doris {
namespace {
using doris::OperatorContext;
struct VRowDistributionHarness {
std::shared_ptr<OlapTableSchemaParam> schema;
std::unique_ptr<VOlapTablePartitionParam> vpartition;
std::unique_ptr<OlapTableLocationParam> location;
std::unique_ptr<OlapTabletFinder> tablet_finder;
std::unique_ptr<OlapTableBlockConvertor> block_convertor;
VExprContextSPtrs output_expr_ctxs;
std::unique_ptr<RowDescriptor> output_row_desc;
VRowDistribution row_distribution;
};
Status _noop_create_partition_callback(void*, TCreatePartitionResult*) {
return Status::OK();
}
std::unique_ptr<VRowDistributionHarness> _build_vrow_distribution_harness(
OperatorContext& ctx, const TOlapTableSchemaParam& tschema,
const TOlapTablePartitionParam& tpartition, const TOlapTableLocationParam& tlocation,
TTupleId tablet_sink_tuple_id, int64_t txn_id) {
auto h = std::make_unique<VRowDistributionHarness>();
h->schema = std::make_shared<OlapTableSchemaParam>();
auto st = h->schema->init(tschema);
EXPECT_TRUE(st.ok()) << st.to_string();
h->vpartition = std::make_unique<VOlapTablePartitionParam>(h->schema, tpartition);
st = h->vpartition->init();
EXPECT_TRUE(st.ok()) << st.to_string();
h->location = std::make_unique<OlapTableLocationParam>(tlocation);
h->tablet_finder = std::make_unique<OlapTabletFinder>(h->vpartition.get(),
OlapTabletFinder::FIND_TABLET_EVERY_ROW);
h->block_convertor = std::make_unique<OlapTableBlockConvertor>(h->schema->tuple_desc());
h->output_row_desc = std::make_unique<RowDescriptor>(
ctx.state.desc_tbl(), std::vector<TTupleId> {tablet_sink_tuple_id});
VRowDistribution::VRowDistributionContext rctx;
rctx.state = &ctx.state;
rctx.block_convertor = h->block_convertor.get();
rctx.tablet_finder = h->tablet_finder.get();
rctx.vpartition = h->vpartition.get();
rctx.add_partition_request_timer = nullptr;
rctx.txn_id = txn_id;
rctx.pool = &ctx.pool;
rctx.location = h->location.get();
rctx.vec_output_expr_ctxs = &h->output_expr_ctxs;
rctx.schema = h->schema;
rctx.caller = nullptr;
rctx.write_single_replica = false;
rctx.create_partition_callback = &_noop_create_partition_callback;
h->row_distribution.init(rctx);
st = h->row_distribution.open(h->output_row_desc.get());
EXPECT_TRUE(st.ok()) << st.to_string();
return h;
}
TEST(VRowDistributionTest, GenerateRowsDistributionNonAutoPartitionBasic) {
OperatorContext ctx;
constexpr int64_t txn_id = 1;
TOlapTableSchemaParam tschema;
TTupleId tablet_sink_tuple_id = 0;
int64_t schema_index_id = 0;
sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id,
false);
auto tpartition = sink_test_utils::build_partition_param(schema_index_id);
auto tlocation = sink_test_utils::build_location_param();
auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation,
tablet_sink_tuple_id, txn_id);
auto input_block = ColumnHelper::create_block<DataTypeInt32>({1, 25});
std::shared_ptr<Block> converted_block;
std::vector<RowPartTabletIds> row_part_tablet_ids;
int64_t rows_stat_val = input_block.rows();
auto st = h->row_distribution.generate_rows_distribution(input_block, converted_block,
row_part_tablet_ids, rows_stat_val);
ASSERT_TRUE(st.ok()) << st.to_string();
ASSERT_NE(converted_block, nullptr);
ASSERT_EQ(row_part_tablet_ids.size(), 1);
ASSERT_EQ(row_part_tablet_ids[0].row_ids.size(), 2);
EXPECT_EQ(row_part_tablet_ids[0].row_ids[0], 0);
EXPECT_EQ(row_part_tablet_ids[0].row_ids[1], 1);
EXPECT_EQ(row_part_tablet_ids[0].partition_ids[0], 1);
EXPECT_EQ(row_part_tablet_ids[0].partition_ids[1], 2);
}
TEST(VRowDistributionTest, GenerateRowsDistributionSkipsImmutablePartition) {
OperatorContext ctx;
constexpr int64_t txn_id = 1;
TOlapTableSchemaParam tschema;
TTupleId tablet_sink_tuple_id = 0;
int64_t schema_index_id = 0;
sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id,
false);
auto tpartition = sink_test_utils::build_partition_param(schema_index_id);
ASSERT_EQ(tpartition.partitions.size(), 2);
tpartition.partitions[0].__set_is_mutable(false);
auto tlocation = sink_test_utils::build_location_param();
auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation,
tablet_sink_tuple_id, txn_id);
auto input_block = ColumnHelper::create_block<DataTypeInt32>({1});
std::shared_ptr<Block> converted_block;
std::vector<RowPartTabletIds> row_part_tablet_ids;
int64_t rows_stat_val = input_block.rows();
auto st = h->row_distribution.generate_rows_distribution(input_block, converted_block,
row_part_tablet_ids, rows_stat_val);
ASSERT_TRUE(st.ok()) << st.to_string();
ASSERT_EQ(row_part_tablet_ids.size(), 1);
EXPECT_TRUE(row_part_tablet_ids[0].row_ids.empty());
auto skipped = h->row_distribution.get_skipped();
ASSERT_EQ(skipped.size(), 1);
EXPECT_TRUE(skipped[0]);
}
TEST(VRowDistributionTest, GenerateRowsDistributionWhereClauseConstFalseFiltersAllRows) {
OperatorContext ctx;
constexpr int64_t txn_id = 1;
TOlapTableSchemaParam tschema;
TTupleId tablet_sink_tuple_id = 0;
int64_t schema_index_id = 0;
sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id,
false);
auto tpartition = sink_test_utils::build_partition_param(schema_index_id);
auto tlocation = sink_test_utils::build_location_param();
auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation,
tablet_sink_tuple_id, txn_id);
TExpr texpr;
texpr.nodes.emplace_back(sink_test_utils::make_bool_literal(false));
VExprContextSPtr where_ctx;
auto st = VExpr::create_expr_tree(texpr, where_ctx);
ASSERT_TRUE(st.ok()) << st.to_string();
st = where_ctx->prepare(&ctx.state, *h->output_row_desc);
ASSERT_TRUE(st.ok()) << st.to_string();
st = where_ctx->open(&ctx.state);
ASSERT_TRUE(st.ok()) << st.to_string();
h->schema->indexes()[0]->where_clause = where_ctx;
auto input_block = ColumnHelper::create_block<DataTypeInt32>({1, 25});
std::shared_ptr<Block> converted_block;
std::vector<RowPartTabletIds> row_part_tablet_ids;
int64_t rows_stat_val = input_block.rows();
st = h->row_distribution.generate_rows_distribution(input_block, converted_block,
row_part_tablet_ids, rows_stat_val);
ASSERT_TRUE(st.ok()) << st.to_string();
ASSERT_EQ(row_part_tablet_ids.size(), 1);
EXPECT_TRUE(row_part_tablet_ids[0].row_ids.empty());
EXPECT_TRUE(row_part_tablet_ids[0].partition_ids.empty());
EXPECT_TRUE(row_part_tablet_ids[0].tablet_ids.empty());
}
TEST(VRowDistributionTest, GenerateRowsDistributionWhereClauseUInt8ColumnFiltersSomeRows) {
OperatorContext ctx;
constexpr int64_t txn_id = 1;
TOlapTableSchemaParam tschema;
TTupleId tablet_sink_tuple_id = 0;
int64_t schema_index_id = 0;
sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id,
false);
auto tpartition = sink_test_utils::build_partition_param(schema_index_id);
auto tlocation = sink_test_utils::build_location_param();
auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation,
tablet_sink_tuple_id, txn_id);
auto where_ctx = VExprContext::create_shared(
std::make_shared<MockSlotRef>(1, std::make_shared<DataTypeUInt8>()));
auto st = where_ctx->prepare(&ctx.state, *h->output_row_desc);
ASSERT_TRUE(st.ok()) << st.to_string();
st = where_ctx->open(&ctx.state);
ASSERT_TRUE(st.ok()) << st.to_string();
h->schema->indexes()[0]->where_clause = where_ctx;
auto input_block = ColumnHelper::create_block<DataTypeInt32>({1, 25, 2});
auto filter_col_mut = ColumnUInt8::create();
filter_col_mut->get_data().push_back(1);
filter_col_mut->get_data().push_back(0);
filter_col_mut->get_data().push_back(1);
ColumnPtr filter_col = std::move(filter_col_mut);
input_block.insert({filter_col, std::make_shared<DataTypeUInt8>(), "f"});
std::shared_ptr<Block> converted_block;
std::vector<RowPartTabletIds> row_part_tablet_ids;
int64_t rows_stat_val = input_block.rows();
st = h->row_distribution.generate_rows_distribution(input_block, converted_block,
row_part_tablet_ids, rows_stat_val);
ASSERT_TRUE(st.ok()) << st.to_string();
ASSERT_EQ(row_part_tablet_ids.size(), 1);
ASSERT_EQ(row_part_tablet_ids[0].row_ids.size(), 2);
EXPECT_EQ(row_part_tablet_ids[0].row_ids[0], 0);
EXPECT_EQ(row_part_tablet_ids[0].row_ids[1], 2);
}
TEST(VRowDistributionTest, AutoPartitionMissingValuesBatchingDedupAndCreatePartition) {
OperatorContext ctx;
constexpr int64_t txn_id = 1;
TOlapTableSchemaParam tschema;
TTupleId tablet_sink_tuple_id = 0;
int64_t schema_index_id = 0;
sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id,
false);
TSlotId partition_slot_id = tschema.slot_descs[0].id;
auto tpartition = sink_test_utils::build_auto_partition_param(
schema_index_id, tablet_sink_tuple_id, partition_slot_id);
auto tlocation = sink_test_utils::build_location_param();
auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation,
tablet_sink_tuple_id, txn_id);
auto input_block = ColumnHelper::create_block<DataTypeInt32>({15, 15});
std::shared_ptr<Block> converted_block;
std::vector<RowPartTabletIds> row_part_tablet_ids;
int64_t rows_stat_val = input_block.rows();
auto st = h->row_distribution.generate_rows_distribution(input_block, converted_block,
row_part_tablet_ids, rows_stat_val);
ASSERT_TRUE(st.ok()) << st.to_string();
ASSERT_TRUE(h->row_distribution._batching_block);
EXPECT_EQ(h->row_distribution._batching_block->rows(), 2);
h->row_distribution._deal_batched = true;
EXPECT_TRUE(h->row_distribution.need_deal_batching());
doris::config::enable_debug_points = true;
doris::DebugPoints::instance()->clear();
bool injected = false;
std::function<void(doris::TCreatePartitionRequest*, doris::TCreatePartitionResult*)> handler =
[&](doris::TCreatePartitionRequest* req, doris::TCreatePartitionResult* res) {
injected = true;
ASSERT_TRUE(req->__isset.partitionValues);
ASSERT_EQ(req->partitionValues.size(), 1);
ASSERT_EQ(req->partitionValues[0].size(), 1);
ASSERT_TRUE(req->partitionValues[0][0].__isset.value);
EXPECT_EQ(req->partitionValues[0][0].value, "15");
doris::TStatus tstatus;
tstatus.__set_status_code(doris::TStatusCode::OK);
res->__set_status(tstatus);
doris::TOlapTablePartition new_part;
new_part.id = 3;
new_part.num_buckets = 1;
new_part.__set_is_mutable(true);
{
doris::TOlapTableIndexTablets index_tablets;
index_tablets.index_id = schema_index_id;
index_tablets.tablets = {300};
new_part.indexes = {index_tablets};
}
new_part.__set_start_keys({sink_test_utils::make_int_literal(10)});
new_part.__set_end_keys({sink_test_utils::make_int_literal(20)});
res->__set_partitions({new_part});
doris::TTabletLocation new_location;
new_location.__set_tablet_id(300);
new_location.__set_node_ids({1});
res->__set_tablets({new_location});
};
doris::DebugPoints::instance()->add_with_callback(
"VRowDistribution.automatic_create_partition.inject_result", handler);
st = h->row_distribution.automatic_create_partition();
EXPECT_TRUE(st.ok()) << st.to_string();
EXPECT_TRUE(injected);
auto check_block = ColumnHelper::create_block<DataTypeInt32>({15});
std::vector<VOlapTablePartition*> parts(1, nullptr);
h->vpartition->find_partition(&check_block, 0, parts[0]);
ASSERT_NE(parts[0], nullptr);
EXPECT_EQ(parts[0]->id, 3);
h->row_distribution.clear_batching_stats();
EXPECT_FALSE(h->row_distribution.need_deal_batching());
doris::DebugPoints::instance()->clear();
doris::config::enable_debug_points = false;
}
TEST(VRowDistributionTest, ReplaceOverwritingPartitionInjectedRequestDedupAndReplace) {
OperatorContext ctx;
constexpr int64_t txn_id = 1;
TOlapTableSchemaParam tschema;
TTupleId tablet_sink_tuple_id = 0;
int64_t schema_index_id = 0;
sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id,
false);
auto tpartition = sink_test_utils::build_partition_param(schema_index_id);
tpartition.__set_enable_auto_detect_overwrite(true);
tpartition.__set_overwrite_group_id(123);
auto tlocation = sink_test_utils::build_location_param();
auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation,
tablet_sink_tuple_id, txn_id);
doris::config::enable_debug_points = true;
doris::DebugPoints::instance()->clear();
int injected_times = 0;
std::function<void(doris::TReplacePartitionRequest*, doris::TReplacePartitionResult*)> handler =
[&](doris::TReplacePartitionRequest* req, doris::TReplacePartitionResult* res) {
injected_times++;
ASSERT_TRUE(req->__isset.partition_ids);
ASSERT_EQ(req->partition_ids.size(), 2);
EXPECT_EQ(req->partition_ids[0], 1);
EXPECT_EQ(req->partition_ids[1], 2);
ASSERT_TRUE(req->__isset.overwrite_group_id);
EXPECT_EQ(req->overwrite_group_id, 123);
doris::TStatus tstatus;
tstatus.__set_status_code(doris::TStatusCode::OK);
res->__set_status(tstatus);
doris::TOlapTablePartition new_p1;
new_p1.id = 11;
new_p1.num_buckets = 1;
new_p1.__set_is_mutable(true);
{
doris::TOlapTableIndexTablets index_tablets;
index_tablets.index_id = schema_index_id;
index_tablets.tablets = {1100};
new_p1.indexes = {index_tablets};
}
doris::TOlapTablePartition new_p2;
new_p2.id = 12;
new_p2.num_buckets = 1;
new_p2.__set_is_mutable(true);
{
doris::TOlapTableIndexTablets index_tablets;
index_tablets.index_id = schema_index_id;
index_tablets.tablets = {1200};
new_p2.indexes = {index_tablets};
}
res->__set_partitions({new_p1, new_p2});
doris::TTabletLocation loc1;
loc1.__set_tablet_id(1100);
loc1.__set_node_ids({1});
doris::TTabletLocation loc2;
loc2.__set_tablet_id(1200);
loc2.__set_node_ids({1});
res->__set_tablets({loc1, loc2});
};
doris::DebugPoints::instance()->add_with_callback(
"VRowDistribution.replace_overwriting_partition.inject_result", handler);
Status st;
{
auto input_block = ColumnHelper::create_block<DataTypeInt32>({1, 25});
std::shared_ptr<Block> converted_block;
std::vector<RowPartTabletIds> row_part_tablet_ids;
int64_t rows_stat_val = input_block.rows();
st = h->row_distribution.generate_rows_distribution(input_block, converted_block,
row_part_tablet_ids, rows_stat_val);
EXPECT_TRUE(st.ok()) << st.to_string();
EXPECT_EQ(injected_times, 1);
ASSERT_EQ(row_part_tablet_ids.size(), 1);
ASSERT_EQ(row_part_tablet_ids[0].partition_ids.size(), 2);
EXPECT_EQ(row_part_tablet_ids[0].partition_ids[0], 11);
EXPECT_EQ(row_part_tablet_ids[0].partition_ids[1], 12);
ASSERT_EQ(row_part_tablet_ids[0].tablet_ids.size(), 2);
EXPECT_EQ(row_part_tablet_ids[0].tablet_ids[0], 1100);
EXPECT_EQ(row_part_tablet_ids[0].tablet_ids[1], 1200);
}
{
auto input_block = ColumnHelper::create_block<DataTypeInt32>({1});
std::shared_ptr<Block> converted_block;
std::vector<RowPartTabletIds> row_part_tablet_ids;
int64_t rows_stat_val = input_block.rows();
st = h->row_distribution.generate_rows_distribution(input_block, converted_block,
row_part_tablet_ids, rows_stat_val);
EXPECT_TRUE(st.ok()) << st.to_string();
EXPECT_EQ(injected_times, 1);
}
doris::DebugPoints::instance()->clear();
doris::config::enable_debug_points = false;
}
} // namespace
} // namespace doris