blob: 3f2f7fe4ef6b77dd6d5b8f2a8ab11dc7793e3296 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_meta_mgr.h"
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
#include <random>
#include <set>
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cpp/sync_point.h"
#include "gen_cpp/cloud.pb.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet_meta.h"
#include "util/uid_util.h"
namespace doris {
using namespace cloud;
using namespace std::chrono;
class CloudMetaMgrTest : public testing::Test {
void SetUp() override {}
void TearDown() override {}
};
TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
// clang-format off
std::atomic<int> task_counter{0};
std::atomic<int> concurrent_tasks{0};
std::atomic<int> max_concurrent_tasks{0};
int num_tasks = 7;
int concurrency = 3;
int sleep_us = 10000;
std::mt19937 rng(system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> dist(0, sleep_us);
std::vector<std::function<Status()>> tasks (
num_tasks,
[&]{
int current = ++concurrent_tasks;
int old_max = max_concurrent_tasks.load();
while (current > old_max && !max_concurrent_tasks.compare_exchange_weak(old_max, current)) {
// Update max if needed
}
bthread_usleep(sleep_us);
task_counter.fetch_add(1);
--concurrent_tasks;
return Status::OK();
}
);
// Test synchronous execution
{
auto start = steady_clock::now();
task_counter.store(0);
concurrent_tasks.store(0);
max_concurrent_tasks.store(0);
EXPECT_TRUE(bthread_fork_join(tasks, concurrency).ok());
auto end = steady_clock::now();
// All 7 tasks should have completed
EXPECT_EQ(task_counter.load(), num_tasks);
// With concurrency 3, we should see at most 3 concurrent tasks
EXPECT_LE(max_concurrent_tasks.load(), concurrency);
EXPECT_GT(max_concurrent_tasks.load(), 0);
EXPECT_GE(duration_cast<microseconds>(end - start).count(), (num_tasks * sleep_us / concurrency));
}
// Test asynchronous execution
{
auto start = steady_clock::now();
task_counter.store(0);
concurrent_tasks.store(0);
max_concurrent_tasks.store(0);
std::future<Status> fut;
auto t = tasks;
EXPECT_TRUE(bthread_fork_join(std::move(t), concurrency, &fut).ok()); // return immediately
EXPECT_LT(task_counter.load(), num_tasks);
// Initially no tasks should have completed
EXPECT_EQ(task_counter.load(), 0);
// Wait for completion
EXPECT_TRUE(fut.get().ok());
auto end = steady_clock::now();
// All 7 tasks should have completed
EXPECT_EQ(task_counter.load(), num_tasks);
// With concurrency 3, we should see at most 3 concurrent tasks
EXPECT_LE(max_concurrent_tasks.load(), concurrency);
EXPECT_GT(max_concurrent_tasks.load(), 0);
EXPECT_GE(duration_cast<microseconds>(end - start).count(), (num_tasks * sleep_us / concurrency));
}
// Test error handling - make the first task fail fast
{
auto start = steady_clock::now();
task_counter.store(0);
concurrent_tasks.store(0);
max_concurrent_tasks.store(0);
auto error_tasks = tasks;
error_tasks.insert(error_tasks.begin(), [&]{
// This task fails immediately
return Status::InternalError<false>("error");
});
EXPECT_FALSE(bthread_fork_join(error_tasks, concurrency).ok());
auto end = steady_clock::now();
// When first task fails, not all tasks may complete
// We can only verify that at least one task ran
EXPECT_LE(task_counter.load(), concurrency);
EXPECT_GE(duration_cast<microseconds>(end - start).count(), sleep_us);
}
// Test asynchronous error handling
{
auto start = steady_clock::now();
auto end = steady_clock::now();
task_counter.store(0);
concurrent_tasks.store(0);
max_concurrent_tasks.store(0);
auto error_tasks = tasks;
error_tasks.insert(error_tasks.begin(), [&]{
// This task fails immediately
return Status::InternalError<false>("error");
});
std::future<Status> fut;
auto t = error_tasks;
EXPECT_TRUE(bthread_fork_join(std::move(t), concurrency, &fut).ok()); // return immediately
EXPECT_LT(task_counter.load(), concurrency);
// Initially no tasks should have completed
EXPECT_EQ(task_counter.load(), 0);
// Wait for completion - should fail
EXPECT_FALSE(fut.get().ok());
end = steady_clock::now();
// When first task fails, not all tasks may complete
// We can only verify that at least one task ran
EXPECT_LE(task_counter.load(), concurrency);
EXPECT_GE(duration_cast<microseconds>(end - start).count(), sleep_us);
}
// clang-format on
}
TEST_F(CloudMetaMgrTest, test_fill_version_holes_no_holes) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet = std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
// Add consecutive versions: 0, 1, 2, 3, 4
std::vector<RowsetSharedPtr> rowsets;
for (int64_t version = 0; version <= 4; ++version) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_tablet_id(1001);
rs_meta->set_index_id(2);
rs_meta->set_partition_id(15673);
rs_meta->set_tablet_uid(UniqueId(9, 10));
rs_meta->set_version(Version(version, version));
rs_meta->set_rowset_type(BETA_ROWSET);
rs_meta->set_rowset_id(engine.next_rowset_id());
rs_meta->set_num_rows(100);
rs_meta->set_empty(false);
rs_meta->set_tablet_schema(tablet->tablet_schema());
// Create rowset and add it to tablet
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(),
rs_meta, &rowset);
EXPECT_TRUE(status.ok());
rowsets.push_back(rowset);
}
// Add all rowsets to tablet
{
std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
tablet->add_rowsets(std::move(rowsets), false, lock, false);
}
// Test fill_version_holes directly - should not add any rowsets since there are no holes
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock);
EXPECT_TRUE(status.ok());
// Verify tablet still has the same number of rowsets (no holes to fill)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5);
// Verify rows number is correct
for (const auto& [_, rs_meta] : tablet->tablet_meta()->all_rs_metas()) {
EXPECT_EQ(rs_meta->num_rows(), 100);
}
}
TEST_F(CloudMetaMgrTest, test_fill_version_holes_with_holes) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet = std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
// Add non-consecutive versions: 0, 2, 4 (missing 1, 3)
std::vector<int64_t> versions = {0, 2, 4};
std::vector<RowsetSharedPtr> rowsets;
for (int64_t version : versions) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_tablet_id(1001);
rs_meta->set_index_id(2);
rs_meta->set_partition_id(15673);
rs_meta->set_tablet_uid(UniqueId(9, 10));
rs_meta->set_version(Version(version, version));
rs_meta->set_rowset_type(BETA_ROWSET);
rs_meta->set_rowset_id(engine.next_rowset_id());
rs_meta->set_num_rows(100);
rs_meta->set_empty(false);
rs_meta->set_tablet_schema(tablet->tablet_schema());
// Create rowset and add it to list
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(),
rs_meta, &rowset);
EXPECT_TRUE(status.ok());
rowsets.push_back(rowset);
}
// Add all rowsets to tablet
{
std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
tablet->add_rowsets(std::move(rowsets), false, lock, false);
}
// Initially we have 3 rowsets (versions 0, 2, 4)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
// Test fill_version_holes directly to fill missing versions 1 and 3
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock);
EXPECT_TRUE(status.ok());
// After filling holes, we should have 5 rowsets (versions 0, 1, 2, 3, 4)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5);
// Verify all versions are present
auto rs_metas = tablet->tablet_meta()->all_rs_metas();
std::set<int64_t> found_versions;
for (const auto& [_, rs_meta] : rs_metas) {
found_versions.insert(rs_meta->version().first);
}
EXPECT_EQ(found_versions.size(), 5);
EXPECT_TRUE(found_versions.contains(0));
EXPECT_TRUE(found_versions.contains(1));
EXPECT_TRUE(found_versions.contains(2));
EXPECT_TRUE(found_versions.contains(3));
EXPECT_TRUE(found_versions.contains(4));
// Verify the hole rowsets (versions 1 and 3) are empty
for (const auto& [_, rs_meta] : rs_metas) {
if (rs_meta->version().first == 1 || rs_meta->version().first == 3) {
EXPECT_TRUE(rs_meta->empty());
EXPECT_EQ(rs_meta->num_rows(), 0);
} else {
EXPECT_FALSE(rs_meta->empty());
EXPECT_EQ(rs_meta->num_rows(), 100);
}
}
}
// Test create_empty_rowset_for_hole function
TEST_F(CloudMetaMgrTest, test_create_empty_rowset_for_hole) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet = std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
// Create a previous rowset meta to pass as reference
auto prev_rs_meta = std::make_shared<RowsetMeta>();
prev_rs_meta->set_tablet_id(1001);
prev_rs_meta->set_index_id(2);
prev_rs_meta->set_partition_id(15673);
prev_rs_meta->set_tablet_uid(UniqueId(9, 10));
prev_rs_meta->set_version(Version(1, 1));
prev_rs_meta->set_rowset_type(BETA_ROWSET);
prev_rs_meta->set_rowset_id(engine.next_rowset_id());
prev_rs_meta->set_num_rows(100);
prev_rs_meta->set_empty(false);
prev_rs_meta->set_tablet_schema(tablet->tablet_schema());
// Test creating an empty rowset for version hole
RowsetSharedPtr hole_rowset;
Status status =
meta_mgr.create_empty_rowset_for_hole(tablet.get(), 2, prev_rs_meta, &hole_rowset);
EXPECT_TRUE(status.ok()) << "Failed to create empty rowset for hole: " << status;
EXPECT_NE(hole_rowset, nullptr);
// Verify the hole rowset properties
auto hole_rs_meta = hole_rowset->rowset_meta();
EXPECT_EQ(hole_rs_meta->tablet_id(), 15673);
EXPECT_EQ(hole_rs_meta->index_id(), 0);
EXPECT_EQ(hole_rs_meta->partition_id(), 2);
EXPECT_EQ(hole_rs_meta->tablet_uid(), UniqueId(9, 10));
EXPECT_EQ(hole_rs_meta->version(), Version(2, 2));
EXPECT_EQ(hole_rs_meta->rowset_type(), BETA_ROWSET);
EXPECT_EQ(hole_rs_meta->num_rows(), 0);
EXPECT_EQ(hole_rs_meta->total_disk_size(), 0);
EXPECT_EQ(hole_rs_meta->data_disk_size(), 0);
EXPECT_EQ(hole_rs_meta->index_disk_size(), 0);
EXPECT_TRUE(hole_rs_meta->empty());
EXPECT_EQ(hole_rs_meta->num_segments(), 0);
EXPECT_EQ(hole_rs_meta->segments_overlap(), NONOVERLAPPING);
EXPECT_EQ(hole_rs_meta->rowset_state(), VISIBLE);
EXPECT_TRUE(hole_rowset->is_hole_rowset());
EXPECT_EQ(hole_rowset->txn_id(), 2); // txn_id should match version
RowsetId expected_rowset_id;
expected_rowset_id.init(2, 0, 15673, 2);
EXPECT_EQ(hole_rowset->rowset_meta()->rowset_id(), expected_rowset_id);
// Test creating multiple hole rowsets with different versions
RowsetSharedPtr hole_rowset_v3;
status = meta_mgr.create_empty_rowset_for_hole(tablet.get(), 3, prev_rs_meta, &hole_rowset_v3);
EXPECT_TRUE(status.ok());
EXPECT_NE(hole_rowset_v3, nullptr);
EXPECT_EQ(hole_rowset_v3->rowset_meta()->version(), Version(3, 3));
EXPECT_TRUE(hole_rowset_v3->is_hole_rowset());
// Verify different hole rowsets have different rowset IDs
EXPECT_NE(hole_rowset->rowset_meta()->rowset_id(), hole_rowset_v3->rowset_meta()->rowset_id());
}
TEST_F(CloudMetaMgrTest, test_fill_version_holes_edge_cases) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
// Test case 1: max_version <= 0
{
TabletMetaSharedPtr tablet_meta(new TabletMeta(
1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet =
std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 0, wlock);
EXPECT_TRUE(status.ok());
status = meta_mgr.fill_version_holes(tablet.get(), -1, wlock);
EXPECT_TRUE(status.ok());
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0);
}
// Test case 2: empty tablet (no existing versions)
{
TabletMetaSharedPtr tablet_meta(new TabletMeta(
1002, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet =
std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
EXPECT_TRUE(status.ok());
// Should still have no rowsets
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0);
}
}
TEST_F(CloudMetaMgrTest, test_fill_version_holes_trailing_holes) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1003, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet = std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
// Add only versions 0, 1, 2 but max_version is 5 (missing 3, 4, 5)
std::vector<RowsetSharedPtr> rowsets;
for (int64_t version = 0; version <= 2; ++version) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_tablet_id(1003);
rs_meta->set_index_id(2);
rs_meta->set_partition_id(15673);
rs_meta->set_tablet_uid(UniqueId(9, 10));
rs_meta->set_version(Version(version, version));
rs_meta->set_rowset_type(BETA_ROWSET);
rs_meta->set_rowset_id(engine.next_rowset_id());
rs_meta->set_num_rows(100);
rs_meta->set_empty(false);
rs_meta->set_tablet_schema(tablet->tablet_schema());
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(),
rs_meta, &rowset);
EXPECT_TRUE(status.ok());
rowsets.push_back(rowset);
}
// Add all rowsets to tablet
{
std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
tablet->add_rowsets(std::move(rowsets), false, lock, false);
}
// Initially we have 3 rowsets (versions 0, 1, 2)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
// Test fill_version_holes to fill trailing holes (versions 3, 4, 5)
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
EXPECT_TRUE(status.ok());
// After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4, 5)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6);
// Verify all versions are present
auto rs_metas = tablet->tablet_meta()->all_rs_metas();
std::set<int64_t> found_versions;
for (const auto& [_, rs_meta] : rs_metas) {
found_versions.insert(rs_meta->version().first);
}
EXPECT_EQ(found_versions.size(), 6);
for (int64_t v = 0; v <= 5; ++v) {
EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
}
// Verify the trailing hole rowsets (versions 3, 4, 5) are empty
for (const auto& [_, rs_meta] : rs_metas) {
if (rs_meta->version().first >= 3) {
EXPECT_TRUE(rs_meta->empty())
<< "Version " << rs_meta->version().first << " should be empty";
EXPECT_EQ(rs_meta->num_rows(), 0);
EXPECT_EQ(rs_meta->total_disk_size(), 0);
} else {
EXPECT_FALSE(rs_meta->empty())
<< "Version " << rs_meta->version().first << " should not be empty";
EXPECT_EQ(rs_meta->num_rows(), 100);
}
}
}
TEST_F(CloudMetaMgrTest, test_fill_version_holes_single_hole) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1004, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet = std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
// Add versions 0, 2 (missing only version 1)
std::vector<int64_t> versions = {0, 2};
std::vector<RowsetSharedPtr> rowsets;
for (int64_t version : versions) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_tablet_id(1004);
rs_meta->set_index_id(2);
rs_meta->set_partition_id(15673);
rs_meta->set_tablet_uid(UniqueId(9, 10));
rs_meta->set_version(Version(version, version));
rs_meta->set_rowset_type(BETA_ROWSET);
rs_meta->set_rowset_id(engine.next_rowset_id());
rs_meta->set_num_rows(100);
rs_meta->set_empty(false);
rs_meta->set_tablet_schema(tablet->tablet_schema());
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(),
rs_meta, &rowset);
EXPECT_TRUE(status.ok());
rowsets.push_back(rowset);
}
// Add all rowsets to tablet
{
std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
tablet->add_rowsets(std::move(rowsets), false, lock, false);
}
// Initially we have 2 rowsets (versions 0, 2)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2);
// Test fill_version_holes to fill single hole (version 1)
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 2, wlock);
EXPECT_TRUE(status.ok());
// After filling holes, we should have 3 rowsets (versions 0, 1, 2)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
// Verify all versions are present
auto rs_metas = tablet->tablet_meta()->all_rs_metas();
std::set<int64_t> found_versions;
for (const auto& [_, rs_meta] : rs_metas) {
found_versions.insert(rs_meta->version().first);
}
EXPECT_EQ(found_versions.size(), 3);
EXPECT_TRUE(found_versions.contains(0));
EXPECT_TRUE(found_versions.contains(1));
EXPECT_TRUE(found_versions.contains(2));
// Verify the hole rowset (version 1) is empty
for (const auto& [_, rs_meta] : rs_metas) {
if (rs_meta->version().first == 1) {
EXPECT_TRUE(rs_meta->empty());
EXPECT_EQ(rs_meta->num_rows(), 0);
EXPECT_EQ(rs_meta->total_disk_size(), 0);
} else {
EXPECT_FALSE(rs_meta->empty());
EXPECT_EQ(rs_meta->num_rows(), 100);
}
}
}
TEST_F(CloudMetaMgrTest, test_fill_version_holes_multiple_consecutive_holes) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1005, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet = std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
// Add versions 0, 5 (missing 1, 2, 3, 4 - multiple consecutive holes)
std::vector<int64_t> versions = {0, 5};
std::vector<RowsetSharedPtr> rowsets;
for (int64_t version : versions) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_tablet_id(1005);
rs_meta->set_index_id(2);
rs_meta->set_partition_id(15673);
rs_meta->set_tablet_uid(UniqueId(9, 10));
rs_meta->set_version(Version(version, version));
rs_meta->set_rowset_type(BETA_ROWSET);
rs_meta->set_rowset_id(engine.next_rowset_id());
rs_meta->set_num_rows(100);
rs_meta->set_empty(false);
rs_meta->set_tablet_schema(tablet->tablet_schema());
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(),
rs_meta, &rowset);
EXPECT_TRUE(status.ok());
rowsets.push_back(rowset);
}
// Add all rowsets to tablet
{
std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
tablet->add_rowsets(std::move(rowsets), false, lock, false);
}
// Initially we have 2 rowsets (versions 0, 5)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2);
// Test fill_version_holes to fill multiple consecutive holes (versions 1, 2, 3, 4)
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
EXPECT_TRUE(status.ok());
// After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4, 5)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6);
// Verify all versions are present
auto rs_metas = tablet->tablet_meta()->all_rs_metas();
std::set<int64_t> found_versions;
for (const auto& [_, rs_meta] : rs_metas) {
found_versions.insert(rs_meta->version().first);
}
EXPECT_EQ(found_versions.size(), 6);
for (int64_t v = 0; v <= 5; ++v) {
EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
}
// Verify the hole rowsets (versions 1, 2, 3, 4) are empty
for (const auto& [_, rs_meta] : rs_metas) {
if (rs_meta->version().first >= 1 && rs_meta->version().first <= 4) {
EXPECT_TRUE(rs_meta->empty())
<< "Version " << rs_meta->version().first << " should be empty";
EXPECT_EQ(rs_meta->num_rows(), 0);
EXPECT_EQ(rs_meta->total_disk_size(), 0);
} else {
EXPECT_FALSE(rs_meta->empty())
<< "Version " << rs_meta->version().first << " should not be empty";
EXPECT_EQ(rs_meta->num_rows(), 100);
}
}
}
TEST_F(CloudMetaMgrTest, test_fill_version_holes_mixed_holes) {
CloudStorageEngine engine(EngineOptions {});
CloudMetaMgr meta_mgr;
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1006, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
auto tablet = std::make_shared<CloudTablet>(engine, std::make_shared<TabletMeta>(*tablet_meta));
// Add versions 0, 2, 5, 6 (missing 1, 3, 4 and potential trailing holes up to max_version)
std::vector<int64_t> versions = {0, 2, 5, 6};
std::vector<RowsetSharedPtr> rowsets;
for (int64_t version : versions) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_tablet_id(1006);
rs_meta->set_index_id(2);
rs_meta->set_partition_id(15673);
rs_meta->set_tablet_uid(UniqueId(9, 10));
rs_meta->set_version(Version(version, version));
rs_meta->set_rowset_type(BETA_ROWSET);
rs_meta->set_rowset_id(engine.next_rowset_id());
rs_meta->set_num_rows(100);
rs_meta->set_empty(false);
rs_meta->set_tablet_schema(tablet->tablet_schema());
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(tablet->tablet_schema(), tablet->tablet_path(),
rs_meta, &rowset);
EXPECT_TRUE(status.ok());
rowsets.push_back(rowset);
}
// Add all rowsets to tablet
{
std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
tablet->add_rowsets(std::move(rowsets), false, lock, false);
}
// Initially we have 4 rowsets (versions 0, 2, 5, 6)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 4);
// Test fill_version_holes with max_version = 8 (should fill 1, 3, 4, 7, 8)
std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
Status status = meta_mgr.fill_version_holes(tablet.get(), 8, wlock);
EXPECT_TRUE(status.ok());
// After filling holes, we should have 9 rowsets (versions 0-8)
EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 9);
// Verify all versions are present
auto rs_metas = tablet->tablet_meta()->all_rs_metas();
std::set<int64_t> found_versions;
for (const auto& [_, rs_meta] : rs_metas) {
found_versions.insert(rs_meta->version().first);
}
EXPECT_EQ(found_versions.size(), 9);
for (int64_t v = 0; v <= 8; ++v) {
EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
}
// Verify the hole rowsets (versions 1, 3, 4, 7, 8) are empty
std::set<int64_t> original_versions = {0, 2, 5, 6};
std::set<int64_t> hole_versions = {1, 3, 4, 7, 8};
for (const auto& [_, rs_meta] : rs_metas) {
int64_t version = rs_meta->version().first;
if (hole_versions.contains(version)) {
EXPECT_TRUE(rs_meta->empty()) << "Version " << version << " should be empty";
EXPECT_EQ(rs_meta->num_rows(), 0);
EXPECT_EQ(rs_meta->total_disk_size(), 0);
} else if (original_versions.contains(version)) {
EXPECT_FALSE(rs_meta->empty()) << "Version " << version << " should not be empty";
EXPECT_EQ(rs_meta->num_rows(), 100);
}
}
}
// Helper class to access private methods for testing
class CloudMetaMgrTestHelper {
public:
static Status call_get_delete_bitmap_from_ms_by_batch(CloudMetaMgr& meta_mgr,
GetDeleteBitmapRequest& req,
GetDeleteBitmapResponse& res,
int64_t bytes_threshold) {
return meta_mgr._get_delete_bitmap_from_ms_by_batch(req, res, bytes_threshold);
}
};
TEST_F(CloudMetaMgrTest, test_get_delete_bitmap_from_ms_by_batch) {
CloudMetaMgr meta_mgr;
auto sp = SyncPoint::get_instance();
// Test case 1: Single batch (no more data)
{
sp->clear_all_call_backs();
sp->enable_processing();
GetDeleteBitmapRequest req;
req.set_tablet_id(12345);
req.set_base_compaction_cnt(1);
req.set_cumulative_compaction_cnt(2);
req.set_cumulative_point(10);
req.set_store_version(100);
req.add_rowset_ids("rowset_1");
req.add_begin_versions(1);
req.add_end_versions(1);
req.add_rowset_ids("rowset_2");
req.add_begin_versions(2);
req.add_end_versions(2);
// Mock the _get_delete_bitmap_from_ms method to return success with no more data
sp->set_call_back("CloudMetaMgr::_get_delete_bitmap_from_ms", [](auto&& args) {
auto* res = try_any_cast<GetDeleteBitmapResponse*>(args[1]);
// Simulate successful response
res->mutable_status()->set_code(MetaServiceCode::OK);
res->add_rowset_ids("rowset_1");
res->add_segment_ids(0);
res->add_versions(1);
res->add_segment_delete_bitmaps("delete_bitmap_1");
res->add_rowset_ids("rowset_2");
res->add_segment_ids(0);
res->add_versions(2);
res->add_segment_delete_bitmaps("delete_bitmap_2");
res->add_returned_rowset_ids("rowset_1");
res->add_returned_rowset_ids("rowset_2");
res->set_has_more(false); // No more data
});
GetDeleteBitmapResponse res;
Status status = CloudMetaMgrTestHelper::call_get_delete_bitmap_from_ms_by_batch(
meta_mgr, req, res, 1024);
EXPECT_TRUE(status.ok()) << "Status: " << status;
EXPECT_EQ(res.rowset_ids_size(), 2);
EXPECT_EQ(res.rowset_ids(0), "rowset_1");
EXPECT_EQ(res.rowset_ids(1), "rowset_2");
EXPECT_EQ(res.segment_delete_bitmaps_size(), 2);
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test case 2: Two batches (has_more = true)
{
sp->clear_all_call_backs();
sp->enable_processing();
GetDeleteBitmapRequest req;
req.set_tablet_id(12345);
req.set_base_compaction_cnt(1);
req.set_cumulative_compaction_cnt(2);
req.set_cumulative_point(10);
req.set_store_version(100);
req.add_rowset_ids("rowset_1");
req.add_begin_versions(1);
req.add_end_versions(1);
req.add_rowset_ids("rowset_2");
req.add_begin_versions(2);
req.add_end_versions(2);
req.add_rowset_ids("rowset_3");
req.add_begin_versions(3);
req.add_end_versions(3);
int call_count = 0;
// Mock the _get_delete_bitmap_from_ms method to simulate two batches
sp->set_call_back("CloudMetaMgr::_get_delete_bitmap_from_ms", [&call_count](auto&& args) {
auto* res = try_any_cast<GetDeleteBitmapResponse*>(args[1]);
call_count++;
res->mutable_status()->set_code(MetaServiceCode::OK);
if (call_count == 1) {
// First batch: return partial data with has_more=true
res->add_rowset_ids("rowset_1");
res->add_segment_ids(0);
res->add_versions(1);
res->add_segment_delete_bitmaps("delete_bitmap_1");
res->add_returned_rowset_ids("rowset_1");
res->set_has_more(true); // More data available
} else if (call_count == 2) {
// Second batch: return remaining data with has_more=false
res->add_rowset_ids("rowset_2");
res->add_segment_ids(0);
res->add_versions(2);
res->add_segment_delete_bitmaps("delete_bitmap_2");
res->add_rowset_ids("rowset_3");
res->add_segment_ids(0);
res->add_versions(3);
res->add_segment_delete_bitmaps("delete_bitmap_3");
res->add_returned_rowset_ids("rowset_2");
res->add_returned_rowset_ids("rowset_3");
res->set_has_more(false); // No more data
}
});
GetDeleteBitmapResponse res;
Status status = CloudMetaMgrTestHelper::call_get_delete_bitmap_from_ms_by_batch(
meta_mgr, req, res, 512);
EXPECT_TRUE(status.ok()) << "Status: " << status;
EXPECT_EQ(call_count, 2); // Should have made 2 RPC calls
EXPECT_EQ(res.rowset_ids_size(), 3);
EXPECT_EQ(res.rowset_ids(0), "rowset_1");
EXPECT_EQ(res.rowset_ids(1), "rowset_2");
EXPECT_EQ(res.rowset_ids(2), "rowset_3");
EXPECT_EQ(res.segment_delete_bitmaps_size(), 3);
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test case 3: Multiple batches (more than 2 batches)
{
sp->clear_all_call_backs();
sp->enable_processing();
GetDeleteBitmapRequest req;
req.set_tablet_id(12345);
req.set_base_compaction_cnt(1);
req.set_cumulative_compaction_cnt(2);
req.set_cumulative_point(10);
req.set_store_version(100);
// Add 5 rowsets to test multiple batches
for (int i = 1; i <= 5; ++i) {
req.add_rowset_ids("rowset_" + std::to_string(i));
req.add_begin_versions(i);
req.add_end_versions(i);
}
int call_count = 0;
// Mock the _get_delete_bitmap_from_ms method to simulate 3 batches
sp->set_call_back("CloudMetaMgr::_get_delete_bitmap_from_ms", [&call_count](auto&& args) {
auto* res = try_any_cast<GetDeleteBitmapResponse*>(args[1]);
call_count++;
res->mutable_status()->set_code(MetaServiceCode::OK);
if (call_count == 1) {
// First batch: return rowset_1 and rowset_2
res->add_rowset_ids("rowset_1");
res->add_segment_ids(0);
res->add_versions(1);
res->add_segment_delete_bitmaps("delete_bitmap_1");
res->add_rowset_ids("rowset_2");
res->add_segment_ids(0);
res->add_versions(2);
res->add_segment_delete_bitmaps("delete_bitmap_2");
res->add_returned_rowset_ids("rowset_1");
res->add_returned_rowset_ids("rowset_2");
res->set_has_more(true); // More data available
} else if (call_count == 2) {
// Second batch: return rowset_3 and rowset_4
res->add_rowset_ids("rowset_3");
res->add_segment_ids(0);
res->add_versions(3);
res->add_segment_delete_bitmaps("delete_bitmap_3");
res->add_rowset_ids("rowset_4");
res->add_segment_ids(0);
res->add_versions(4);
res->add_segment_delete_bitmaps("delete_bitmap_4");
res->add_returned_rowset_ids("rowset_3");
res->add_returned_rowset_ids("rowset_4");
res->set_has_more(true); // Still more data available
} else if (call_count == 3) {
// Third batch: return rowset_5
res->add_rowset_ids("rowset_5");
res->add_segment_ids(0);
res->add_versions(5);
res->add_segment_delete_bitmaps("delete_bitmap_5");
res->add_returned_rowset_ids("rowset_5");
res->set_has_more(false); // No more data
}
});
GetDeleteBitmapResponse res;
Status status = CloudMetaMgrTestHelper::call_get_delete_bitmap_from_ms_by_batch(
meta_mgr, req, res, 256); // Small threshold to force multiple batches
EXPECT_TRUE(status.ok()) << "Status: " << status;
EXPECT_EQ(call_count, 3); // Should have made 3 RPC calls
EXPECT_EQ(res.rowset_ids_size(), 5);
EXPECT_EQ(res.segment_delete_bitmaps_size(), 5);
for (int i = 1; i <= 5; ++i) {
EXPECT_EQ(res.rowset_ids(i - 1), "rowset_" + std::to_string(i));
EXPECT_EQ(res.segment_delete_bitmaps(i - 1), "delete_bitmap_" + std::to_string(i));
}
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test case 4: RPC failure
{
sp->clear_all_call_backs();
sp->enable_processing();
GetDeleteBitmapRequest req;
req.set_tablet_id(12345);
req.add_rowset_ids("rowset_1");
req.add_begin_versions(1);
req.add_end_versions(1);
// Mock to simulate connection failure or service unavailable
sp->set_call_back("CloudMetaMgr::_get_delete_bitmap_from_ms", [](auto&& args) {
auto* res = try_any_cast<GetDeleteBitmapResponse*>(args[1]);
res->mutable_status()->set_code(MetaServiceCode::TABLET_NOT_FOUND);
res->mutable_status()->set_msg("Tablet not found");
});
GetDeleteBitmapResponse res;
Status status = CloudMetaMgrTestHelper::call_get_delete_bitmap_from_ms_by_batch(
meta_mgr, req, res, 1024);
// The method should handle the error from _get_delete_bitmap_from_ms
// Since _get_delete_bitmap_from_ms_by_batch calls RETURN_IF_ERROR, it should propagate the error
EXPECT_FALSE(status.ok());
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test case 5: V2 delete bitmap handling with multiple batches
{
sp->clear_all_call_backs();
sp->enable_processing();
GetDeleteBitmapRequest req;
req.set_tablet_id(12345);
req.add_rowset_ids("rowset_1");
req.add_begin_versions(1);
req.add_end_versions(1);
req.add_rowset_ids("rowset_2");
req.add_begin_versions(2);
req.add_end_versions(2);
int call_count = 0;
// Mock to return v2 delete bitmap data across multiple batches
sp->set_call_back("CloudMetaMgr::_get_delete_bitmap_from_ms", [&call_count](auto&& args) {
auto* res = try_any_cast<GetDeleteBitmapResponse*>(args[1]);
call_count++;
res->mutable_status()->set_code(MetaServiceCode::OK);
if (call_count == 1) {
// First batch: v2 delete bitmap data with actual content (stored in FDB)
res->add_delta_rowset_ids("delta_rowset_1");
auto* storage1 = res->add_delete_bitmap_storages();
storage1->set_store_in_fdb(true); // Has delete bitmap data
auto* delete_bitmap1 = storage1->mutable_delete_bitmap();
delete_bitmap1->add_rowset_ids("rowset_1");
delete_bitmap1->add_segment_ids(0);
delete_bitmap1->add_versions(1);
delete_bitmap1->add_segment_delete_bitmaps("v2_bitmap_1");
res->add_returned_rowset_ids("rowset_1");
res->set_has_more(true);
} else if (call_count == 2) {
// Second batch: v2 delete bitmap without local data
res->add_delta_rowset_ids("delta_rowset_2");
auto* storage2 = res->add_delete_bitmap_storages();
storage2->set_store_in_fdb(false); // No local bitmap data
res->add_returned_rowset_ids("rowset_2");
res->set_has_more(false);
}
});
GetDeleteBitmapResponse res;
Status status = CloudMetaMgrTestHelper::call_get_delete_bitmap_from_ms_by_batch(
meta_mgr, req, res, 512);
EXPECT_TRUE(status.ok()) << "Status: " << status;
EXPECT_EQ(call_count, 2);
EXPECT_EQ(res.delta_rowset_ids_size(), 2);
EXPECT_EQ(res.delta_rowset_ids(0), "delta_rowset_1");
EXPECT_EQ(res.delta_rowset_ids(1), "delta_rowset_2");
EXPECT_EQ(res.delete_bitmap_storages_size(), 2);
// First storage: store_in_fdb=true, has delete bitmap data
EXPECT_TRUE(res.delete_bitmap_storages(0).store_in_fdb());
EXPECT_TRUE(res.delete_bitmap_storages(0).has_delete_bitmap());
EXPECT_EQ(res.delete_bitmap_storages(0).delete_bitmap().rowset_ids(0), "rowset_1");
EXPECT_EQ(res.delete_bitmap_storages(0).delete_bitmap().segment_delete_bitmaps(0),
"v2_bitmap_1");
// Second storage: store_in_fdb=false, no local bitmap data
EXPECT_FALSE(res.delete_bitmap_storages(1).store_in_fdb());
EXPECT_FALSE(res.delete_bitmap_storages(1).has_delete_bitmap());
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test case 6: Mixed V1 and V2 delete bitmap handling
{
sp->clear_all_call_backs();
sp->enable_processing();
GetDeleteBitmapRequest req;
req.set_tablet_id(12345);
req.add_rowset_ids("rowset_1");
req.add_begin_versions(1);
req.add_end_versions(1);
req.add_rowset_ids("rowset_2");
req.add_begin_versions(2);
req.add_end_versions(2);
req.add_rowset_ids("rowset_3");
req.add_begin_versions(3);
req.add_end_versions(3);
int call_count = 0;
// Mock to return both v1 and v2 delete bitmap data in multiple batches
sp->set_call_back("CloudMetaMgr::_get_delete_bitmap_from_ms", [&call_count](auto&& args) {
auto* res = try_any_cast<GetDeleteBitmapResponse*>(args[1]);
call_count++;
res->mutable_status()->set_code(MetaServiceCode::OK);
if (call_count == 1) {
// First batch: return both v1 and v2 data
// V1 delete bitmap data
res->add_rowset_ids("rowset_1");
res->add_segment_ids(0);
res->add_versions(1);
res->add_segment_delete_bitmaps("v1_delete_bitmap_1");
// V2 delete bitmap data - has data when store_in_fdb=true
res->add_delta_rowset_ids("delta_rowset_1");
auto* storage1 = res->add_delete_bitmap_storages();
storage1->set_store_in_fdb(true); // Has delete bitmap data
auto* delete_bitmap1 = storage1->mutable_delete_bitmap();
delete_bitmap1->add_rowset_ids("rowset_1");
delete_bitmap1->add_segment_ids(0);
delete_bitmap1->add_versions(1);
delete_bitmap1->add_segment_delete_bitmaps("v2_bitmap_1");
res->add_returned_rowset_ids("rowset_1");
res->set_has_more(true);
} else if (call_count == 2) {
// Second batch: return more mixed v1 and v2 data
// More V1 data
res->add_rowset_ids("rowset_2");
res->add_segment_ids(1);
res->add_versions(2);
res->add_segment_delete_bitmaps("v1_delete_bitmap_2");
res->add_rowset_ids("rowset_3");
res->add_segment_ids(0);
res->add_versions(3);
res->add_segment_delete_bitmaps("v1_delete_bitmap_3");
// More V2 data - mixed scenarios
res->add_delta_rowset_ids("delta_rowset_2");
auto* storage2 = res->add_delete_bitmap_storages();
storage2->set_store_in_fdb(true); // Has delete bitmap data
auto* delete_bitmap2 = storage2->mutable_delete_bitmap();
delete_bitmap2->add_rowset_ids("rowset_2");
delete_bitmap2->add_segment_ids(1);
delete_bitmap2->add_versions(2);
delete_bitmap2->add_segment_delete_bitmaps("v2_bitmap_2");
res->add_delta_rowset_ids("delta_rowset_3");
auto* storage3 = res->add_delete_bitmap_storages();
storage3->set_store_in_fdb(false); // No local bitmap data
res->add_returned_rowset_ids("rowset_2");
res->add_returned_rowset_ids("rowset_3");
res->set_has_more(false);
}
});
GetDeleteBitmapResponse res;
Status status = CloudMetaMgrTestHelper::call_get_delete_bitmap_from_ms_by_batch(
meta_mgr, req, res, 512);
EXPECT_TRUE(status.ok()) << "Status: " << status;
EXPECT_EQ(call_count, 2);
// Verify V1 delete bitmap data was merged correctly
EXPECT_EQ(res.rowset_ids_size(), 3);
EXPECT_EQ(res.rowset_ids(0), "rowset_1");
EXPECT_EQ(res.rowset_ids(1), "rowset_2");
EXPECT_EQ(res.rowset_ids(2), "rowset_3");
EXPECT_EQ(res.segment_ids_size(), 3);
EXPECT_EQ(res.segment_ids(0), 0);
EXPECT_EQ(res.segment_ids(1), 1);
EXPECT_EQ(res.segment_ids(2), 0);
EXPECT_EQ(res.versions_size(), 3);
EXPECT_EQ(res.versions(0), 1);
EXPECT_EQ(res.versions(1), 2);
EXPECT_EQ(res.versions(2), 3);
EXPECT_EQ(res.segment_delete_bitmaps_size(), 3);
EXPECT_EQ(res.segment_delete_bitmaps(0), "v1_delete_bitmap_1");
EXPECT_EQ(res.segment_delete_bitmaps(1), "v1_delete_bitmap_2");
EXPECT_EQ(res.segment_delete_bitmaps(2), "v1_delete_bitmap_3");
// Verify V2 delete bitmap data was merged correctly
EXPECT_EQ(res.delta_rowset_ids_size(), 3);
EXPECT_EQ(res.delta_rowset_ids(0), "delta_rowset_1");
EXPECT_EQ(res.delta_rowset_ids(1), "delta_rowset_2");
EXPECT_EQ(res.delta_rowset_ids(2), "delta_rowset_3");
EXPECT_EQ(res.delete_bitmap_storages_size(), 3);
// First storage: store_in_fdb=true, has delete bitmap data
EXPECT_TRUE(res.delete_bitmap_storages(0).store_in_fdb());
EXPECT_TRUE(res.delete_bitmap_storages(0).has_delete_bitmap());
EXPECT_EQ(res.delete_bitmap_storages(0).delete_bitmap().rowset_ids(0), "rowset_1");
EXPECT_EQ(res.delete_bitmap_storages(0).delete_bitmap().segment_delete_bitmaps(0),
"v2_bitmap_1");
// Second storage: store_in_fdb=true, has delete bitmap data
EXPECT_TRUE(res.delete_bitmap_storages(1).store_in_fdb());
EXPECT_TRUE(res.delete_bitmap_storages(1).has_delete_bitmap());
EXPECT_EQ(res.delete_bitmap_storages(1).delete_bitmap().rowset_ids(0), "rowset_2");
EXPECT_EQ(res.delete_bitmap_storages(1).delete_bitmap().segment_delete_bitmaps(0),
"v2_bitmap_2");
// Third storage: store_in_fdb=false, no local bitmap data
EXPECT_FALSE(res.delete_bitmap_storages(2).store_in_fdb());
EXPECT_FALSE(res.delete_bitmap_storages(2).has_delete_bitmap());
sp->disable_processing();
sp->clear_all_call_backs();
}
}
} // namespace doris