blob: 0c6c48faa3be5c4fbbc595db70c9c9382400d6c6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/cumulative_compaction.h"
#include <gmock/gmock-actions.h>
#include <gmock/gmock-matchers.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <filesystem>
#include <memory>
#include "common/status.h"
#include "cpp/sync_point.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/local_file_system.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/data_dir.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/threadpool.h"
namespace doris {
using namespace config;
class CumulativeCompactionTest : public testing::Test {
public:
virtual void SetUp() {}
virtual void TearDown() {}
};
static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping,
int data_size) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_rowset_type(BETA_ROWSET); // important
rs_meta->_rowset_meta_pb.set_start_version(version.first);
rs_meta->_rowset_meta_pb.set_end_version(version.second);
rs_meta->set_num_segments(num_segments);
rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
rs_meta->set_total_disk_size(data_size);
RowsetSharedPtr rowset;
Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset);
if (!st.ok()) {
return nullptr;
}
return rowset;
}
TEST_F(CumulativeCompactionTest, TestConsecutiveVersion) {
EngineOptions options;
StorageEngine storage_engine(options);
//TabletSharedPtr tablet;
TabletMetaSharedPtr tablet_meta;
tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
TabletSharedPtr tablet(
new Tablet(storage_engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
CumulativeCompaction cumu_compaction(storage_engine, tablet);
{
std::vector<RowsetSharedPtr> rowsets;
for (int i = 2; i < 10; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
std::vector<Version> missing_version;
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
EXPECT_EQ(rowsets.size(), 8);
EXPECT_EQ(rowsets.front()->start_version(), 2);
EXPECT_EQ(rowsets.front()->end_version(), 2);
EXPECT_EQ(rowsets.back()->start_version(), 9);
EXPECT_EQ(rowsets.back()->end_version(), 9);
EXPECT_EQ(missing_version.size(), 0);
}
{
std::vector<RowsetSharedPtr> rowsets;
for (int i = 2; i <= 4; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
for (int i = 6; i <= 10; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
for (int i = 12; i <= 13; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
std::vector<Version> missing_version;
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
EXPECT_EQ(rowsets.size(), 5);
EXPECT_EQ(rowsets.front()->start_version(), 6);
EXPECT_EQ(rowsets.front()->end_version(), 6);
EXPECT_EQ(rowsets.back()->start_version(), 10);
EXPECT_EQ(rowsets.back()->end_version(), 10);
EXPECT_EQ(missing_version.size(), 4);
EXPECT_EQ(missing_version[0].first, 4);
EXPECT_EQ(missing_version[0].second, 4);
EXPECT_EQ(missing_version[1].first, 6);
EXPECT_EQ(missing_version[1].second, 6);
EXPECT_EQ(missing_version[2].first, 10);
EXPECT_EQ(missing_version[2].second, 10);
EXPECT_EQ(missing_version[3].first, 12);
EXPECT_EQ(missing_version[3].second, 12);
}
{
std::vector<RowsetSharedPtr> rowsets;
for (int i = 2; i <= 2; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
for (int i = 4; i <= 4; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
std::vector<Version> missing_version;
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
EXPECT_EQ(rowsets.size(), 1);
EXPECT_EQ(rowsets.front()->start_version(), 2);
EXPECT_EQ(rowsets.front()->end_version(), 2);
EXPECT_EQ(rowsets.back()->start_version(), 2);
EXPECT_EQ(rowsets.back()->end_version(), 2);
EXPECT_EQ(missing_version.size(), 2);
EXPECT_EQ(missing_version[0].first, 2);
EXPECT_EQ(missing_version[0].second, 2);
EXPECT_EQ(missing_version[1].first, 4);
EXPECT_EQ(missing_version[1].second, 4);
}
{
std::vector<RowsetSharedPtr> rowsets;
RowsetSharedPtr rs = create_rowset({2, 3}, 1, false, 1024);
rowsets.push_back(rs);
rs = create_rowset({4, 5}, 1, false, 1024);
rowsets.push_back(rs);
rs = create_rowset({9, 11}, 1, false, 1024);
rowsets.push_back(rs);
rs = create_rowset({12, 13}, 1, false, 1024);
rowsets.push_back(rs);
std::vector<Version> missing_version;
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
EXPECT_EQ(rowsets.size(), 2);
EXPECT_EQ(rowsets.front()->start_version(), 2);
EXPECT_EQ(rowsets.front()->end_version(), 3);
EXPECT_EQ(rowsets.back()->start_version(), 4);
EXPECT_EQ(rowsets.back()->end_version(), 5);
EXPECT_EQ(missing_version.size(), 2);
EXPECT_EQ(missing_version[0].first, 4);
EXPECT_EQ(missing_version[0].second, 5);
EXPECT_EQ(missing_version[1].first, 9);
EXPECT_EQ(missing_version[1].second, 11);
}
{
std::vector<RowsetSharedPtr> rowsets;
for (int i = 2; i <= 2; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
std::vector<Version> missing_version;
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
EXPECT_EQ(rowsets.size(), 1);
EXPECT_EQ(rowsets.front()->start_version(), 2);
EXPECT_EQ(rowsets.front()->end_version(), 2);
EXPECT_EQ(rowsets.back()->start_version(), 2);
EXPECT_EQ(rowsets.back()->end_version(), 2);
EXPECT_EQ(missing_version.size(), 0);
}
{
std::vector<RowsetSharedPtr> rowsets;
for (int i = 2; i <= 2; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
std::vector<Version> missing_version;
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
EXPECT_EQ(rowsets.size(), 1);
EXPECT_EQ(rowsets.front()->start_version(), 2);
EXPECT_EQ(rowsets.front()->end_version(), 2);
EXPECT_EQ(rowsets.back()->start_version(), 2);
EXPECT_EQ(rowsets.back()->end_version(), 2);
EXPECT_EQ(missing_version.size(), 0);
}
{
std::vector<RowsetSharedPtr> rowsets;
for (int i = 2; i <= 4; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
for (int i = 6; i <= 10; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
for (int i = 12; i <= 20; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
rowsets.push_back(rs);
}
std::vector<Version> missing_version;
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
EXPECT_EQ(rowsets.size(), 9);
EXPECT_EQ(rowsets.front()->start_version(), 12);
EXPECT_EQ(rowsets.front()->end_version(), 12);
EXPECT_EQ(rowsets.back()->start_version(), 20);
EXPECT_EQ(rowsets.back()->end_version(), 20);
EXPECT_EQ(missing_version.size(), 4);
EXPECT_EQ(missing_version[0].first, 4);
EXPECT_EQ(missing_version[0].second, 4);
EXPECT_EQ(missing_version[1].first, 6);
EXPECT_EQ(missing_version[1].second, 6);
EXPECT_EQ(missing_version[2].first, 10);
EXPECT_EQ(missing_version[2].second, 10);
EXPECT_EQ(missing_version[3].first, 12);
EXPECT_EQ(missing_version[3].second, 12);
}
}
TEST_F(CumulativeCompactionTest, TestShouldDelayLargeTask) {
// Initialize storage engine and thread pool
EngineOptions options;
StorageEngine storage_engine(options);
EXPECT_EQ(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(1)
.set_max_threads(2)
.build(&storage_engine._cumu_compaction_thread_pool),
Status::OK());
// Configure parameters
config::large_cumu_compaction_task_min_thread_num = 3;
config::large_cumu_compaction_task_row_num_threshold = 10;
// Set thread pool max threads
EXPECT_EQ(storage_engine._cumu_compaction_thread_pool->set_max_threads(3), Status::OK());
storage_engine._cumu_compaction_thread_pool_used_threads = 2;
EXPECT_EQ(storage_engine._should_delay_large_task(), false);
storage_engine._cumu_compaction_thread_pool_used_threads = 3;
storage_engine._cumu_compaction_thread_pool_small_tasks_running = 1;
EXPECT_EQ(storage_engine._should_delay_large_task(), false);
storage_engine._cumu_compaction_thread_pool_used_threads = 3;
storage_engine._cumu_compaction_thread_pool_small_tasks_running = 0;
EXPECT_EQ(storage_engine._should_delay_large_task(), true);
}
} // namespace doris