blob: ce467fb1d45f69566949f336884599888185d576 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/writer/vtablet_writer_v2.h"
#include <gtest/gtest.h>
#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
class TestVTabletWriterV2 : public ::testing::Test {
public:
TestVTabletWriterV2() = default;
~TestVTabletWriterV2() override = default;
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};
const int64_t src_id = 1000;
static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t node_id,
std::vector<int64_t> success_tablets,
std::unordered_map<int64_t, Status> failed_tablets) {
auto streams = load_stream_map->get_or_create(node_id);
streams->mark_open();
for (const auto& tablet_id : success_tablets) {
streams->select_one_stream()->add_success_tablet(tablet_id);
}
for (const auto& [tablet_id, reason] : failed_tablets) {
streams->select_one_stream()->add_failed_tablet(tablet_id, reason);
}
}
static std::unique_ptr<vectorized::VTabletWriterV2> create_vtablet_writer(int num_replicas = 3) {
TDataSink t_sink;
t_sink.__isset.olap_table_sink = true;
t_sink.olap_table_sink.num_replicas = num_replicas;
vectorized::VExprContextSPtrs output_exprs;
std::shared_ptr<pipeline::Dependency> dep = nullptr;
std::shared_ptr<pipeline::Dependency> fin_dep = nullptr;
auto writer = std::make_unique<vectorized::VTabletWriterV2>(t_sink, output_exprs, dep, fin_dep);
int required_replicas = num_replicas / 2 + 1;
writer->_tablet_replica_info[1] = std::make_pair(num_replicas, required_replicas);
writer->_tablet_replica_info[2] = std::make_pair(num_replicas, required_replicas);
return writer;
}
TEST_F(TestVTabletWriterV2, one_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
auto writer = create_vtablet_writer(1);
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 2);
}
TEST_F(TestVTabletWriterV2, one_replica_fail) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}});
auto writer = create_vtablet_writer(1);
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_EQ(st, Status::InternalError("test"));
}
TEST_F(TestVTabletWriterV2, two_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
auto writer = create_vtablet_writer(2);
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, two_replica_fail) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1, 2}, {});
auto writer = create_vtablet_writer(2);
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_EQ(st, Status::InternalError("test"));
}
TEST_F(TestVTabletWriterV2, normal) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 6);
}
TEST_F(TestVTabletWriterV2, miss_one) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
TEST_F(TestVTabletWriterV2, miss_two) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1}, {});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, fail_one) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
TEST_F(TestVTabletWriterV2, fail_one_duplicate) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
// Duplicate tablets from same node should be ignored
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_same_node) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {},
{{1, Status::InternalError("test")}, {2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_diff_node) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {2}, {{1, Status::InternalError("test")}});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
TEST_F(TestVTabletWriterV2, fail_two_same_tablet) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}
TEST_F(TestVTabletWriterV2, fail_two_miss_one_same_tablet) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
add_stream(load_stream_map, 1001, {1}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}});
auto writer = create_vtablet_writer();
auto st = writer->_create_commit_info(tablet_commit_infos, load_stream_map);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}
} // namespace doris