blob: ba0d8b87d05ff130cb5b17babb30288fdf70245a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <stdint.h>
#include <unistd.h>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "gen_cpp/internal_service.pb.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "olap/delta_writer.h"
#include "olap/olap_common.h"
#include "olap/options.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/task/engine_publish_version_task.h"
#include "olap/txn_manager.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptor_helper.h"
#include "runtime/descriptors.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris {
class OlapMeta;
struct Slice;
static std::unique_ptr<StorageEngine> k_engine;
static const std::string kTestDir = "ut_dir/tablet_cooldown_test";
static constexpr int64_t kResourceId = 10000;
static constexpr int64_t kStoragePolicyId = 10002;
static constexpr int64_t kTabletId = 10005;
static constexpr int64_t kReplicaId = 10009;
static constexpr int32_t kSchemaHash = 270068377;
static constexpr int32_t kTxnId = 20003;
static constexpr int32_t kPartitionId = 30003;
using io::Path;
static io::RemoteFileSystemSPtr s_fs;
static std::string get_remote_path(const Path& path) {
return fmt::format("{}/remote/{}", config::storage_root_path, path.string());
}
class FileWriterMock : public io::FileWriter {
public:
FileWriterMock(Path path) : io::FileWriter(std::move(path), io::global_local_filesystem()) {
Status st = io::global_local_filesystem()->create_file(get_remote_path(_path),
&_local_file_writer);
if (!st.ok()) {
std::cerr << "create file writer failed: " << st << std::endl;
}
}
~FileWriterMock() override = default;
Status close() override { return _local_file_writer->close(); }
Status appendv(const Slice* data, size_t data_cnt) override {
return _local_file_writer->appendv(data, data_cnt);
}
Status finalize() override { return _local_file_writer->finalize(); }
private:
std::unique_ptr<io::FileWriter> _local_file_writer;
};
class RemoteFileSystemMock : public io::RemoteFileSystem {
public:
RemoteFileSystemMock(Path root_path, std::string&& id, io::FileSystemType type)
: RemoteFileSystem(std::move(root_path), std::move(id), type) {
_local_fs = io::LocalFileSystem::create(get_remote_path(_root_path));
}
~RemoteFileSystemMock() override = default;
protected:
Status create_file_impl(const Path& path, io::FileWriterPtr* writer,
const io::FileWriterOptions* opts = nullptr) override {
Path fs_path = path;
*writer = std::make_unique<FileWriterMock>(fs_path);
return Status::OK();
}
Status create_directory_impl(const Path& path, bool failed_if_exists) override {
return _local_fs->create_directory(get_remote_path(path));
}
Status delete_file_impl(const Path& path) override {
return _local_fs->delete_file(get_remote_path(path));
}
Status batch_delete_impl(const std::vector<Path>& paths) override {
for (int i = 0; i < paths.size(); ++i) {
RETURN_IF_ERROR(delete_file(paths[i]));
}
return Status::OK();
}
Status delete_directory_impl(const Path& path) override {
return _local_fs->delete_directory(get_remote_path(path));
}
Status exists_impl(const Path& path, bool* res) const override {
return _local_fs->exists(get_remote_path(path), res);
}
Status file_size_impl(const Path& path, int64_t* file_size) const override {
return _local_fs->file_size(get_remote_path(path), file_size);
}
Status list_impl(const Path& dir, bool regular_file, std::vector<io::FileInfo>* files,
bool* exists) override {
RETURN_IF_ERROR(_local_fs->list(get_remote_path(dir), true, files, exists));
// for (auto& path : local_paths) {
// files->emplace_back(path.file_name.substr(config::storage_root_path.size() + 1));
// }
return Status::OK();
}
Status upload_impl(const Path& local_path, const Path& dest_path) override {
return _local_fs->link_file(local_path, get_remote_path(dest_path));
}
Status batch_upload_impl(const std::vector<Path>& local_paths,
const std::vector<Path>& dest_paths) override {
for (int i = 0; i < local_paths.size(); ++i) {
RETURN_IF_ERROR(upload_impl(local_paths[i], dest_paths[i]));
}
return Status::OK();
}
Status download_impl(const Path& remote_file, const Path& local_file) override {
return Status::OK();
}
Status open_file_internal(const Path& file, io::FileReaderSPtr* reader,
const io::FileReaderOptions& opts) override {
auto path = get_remote_path(file);
return _local_fs->open_file(path, reader);
}
Status connect_impl() override { return Status::OK(); }
Status rename_impl(const Path& orig_name, const Path& new_name) override {
return Status::OK();
}
private:
std::shared_ptr<io::LocalFileSystem> _local_fs;
};
class TabletCooldownTest : public testing::Test {
public:
static void SetUpTestSuite() {
s_fs.reset(
new RemoteFileSystemMock("", std::to_string(kResourceId), io::FileSystemType::S3));
StorageResource resource = {s_fs, 1};
put_storage_resource(kResourceId, resource);
auto storage_policy = std::make_shared<StoragePolicy>();
storage_policy->name = "TabletCooldownTest";
storage_policy->version = 1;
storage_policy->resource_id = kResourceId;
storage_policy->cooldown_datetime = UnixSeconds() - 1;
put_storage_policy(kStoragePolicyId, storage_policy);
constexpr uint32_t MAX_PATH_LEN = 1024;
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
config::storage_root_path = std::string(buffer) + "/" + kTestDir;
config::min_file_descriptor_number = 1000;
auto st = io::global_local_filesystem()->delete_directory(config::storage_root_path);
ASSERT_TRUE(st.ok()) << st;
st = io::global_local_filesystem()->create_directory(config::storage_root_path);
ASSERT_TRUE(st.ok()) << st;
std::vector<StorePath> paths {{config::storage_root_path, -1}};
EngineOptions options;
options.store_paths = paths;
k_engine = std::make_unique<StorageEngine>(options);
st = k_engine->open();
EXPECT_TRUE(st.ok()) << st.to_string();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_write_cooldown_meta_executors(); // default cons
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
exec_env->set_storage_engine(k_engine.get());
}
static void TearDownTestSuite() {
k_engine.reset();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_storage_engine(nullptr);
exec_env->set_memtable_memory_limiter(nullptr);
}
};
static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t schema_hash,
TCreateTabletReq* request) {
request->tablet_id = tablet_id;
request->partition_id = 30003;
request->__set_version(1);
request->tablet_schema.schema_hash = schema_hash;
request->tablet_schema.short_key_column_count = 2;
request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS;
request->tablet_schema.storage_type = TStorageType::COLUMN;
request->tablet_schema.__set_sequence_col_idx(2);
request->__set_storage_format(TStorageFormat::V2);
TColumn k1;
k1.column_name = "k1";
k1.__set_is_key(true);
k1.column_type.type = TPrimitiveType::TINYINT;
request->tablet_schema.columns.push_back(k1);
TColumn k2;
k2.column_name = "k2";
k2.__set_is_key(true);
k2.column_type.type = TPrimitiveType::SMALLINT;
request->tablet_schema.columns.push_back(k2);
TColumn sequence_col;
sequence_col.column_name = SEQUENCE_COL;
sequence_col.__set_is_key(false);
sequence_col.column_type.type = TPrimitiveType::INT;
sequence_col.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(sequence_col);
TColumn v1;
v1.column_name = "v1";
v1.__set_is_key(false);
v1.column_type.type = TPrimitiveType::DATETIME;
v1.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(v1);
}
static TDescriptorTable create_descriptor_tablet_with_sequence_col() {
TDescriptorTableBuilder desc_tbl_builder;
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name(SEQUENCE_COL)
.column_pos(2)
.nullable(false)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_DATETIME)
.column_name("v1")
.column_pos(3)
.nullable(false)
.build());
tuple_builder.build(&desc_tbl_builder);
return desc_tbl_builder.desc_tbl();
}
static void write_rowset(TabletSharedPtr* tablet, PUniqueId load_id, int64_t replica_id,
int32_t schema_hash, int64_t tablet_id, int64_t txn_id,
int64_t partition_id, TupleDescriptor* tuple_desc, bool with_data = true) {
auto profile = std::make_unique<RuntimeProfile>("LoadChannels");
WriteRequest write_req;
write_req.tablet_id = tablet_id;
write_req.schema_hash = schema_hash;
write_req.txn_id = txn_id;
write_req.partition_id = partition_id;
write_req.load_id = load_id;
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = std::make_shared<OlapTableSchemaParam>();
auto delta_writer =
std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {});
ASSERT_NE(delta_writer, nullptr);
vectorized::Block block;
for (const auto& slot_desc : tuple_desc->slots()) {
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
Status st;
auto columns = block.mutate_columns();
if (with_data) {
int8_t c1 = 123;
columns[0]->insert_data((const char*)&c1, sizeof(c1));
int16_t c2 = 456;
columns[1]->insert_data((const char*)&c2, sizeof(c2));
int32_t c3 = 1;
columns[2]->insert_data((const char*)&c3, sizeof(c2));
VecDateTimeValue c4;
c4.from_date_str("2020-07-16 19:39:43", 19);
int64_t c4_int = c4.to_int64();
columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
st = delta_writer->write(&block, {0});
ASSERT_EQ(Status::OK(), st);
}
st = delta_writer->close();
ASSERT_EQ(Status::OK(), st);
st = delta_writer->build_rowset();
ASSERT_EQ(Status::OK(), st);
st = delta_writer->commit_txn(PSlaveTabletNodes());
ASSERT_EQ(Status::OK(), st);
// publish version success
*tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
OlapMeta* meta = (*tablet)->data_dir()->get_meta();
Version version;
version.first = (*tablet)->rowset_with_max_version()->end_version() + 1;
version.second = (*tablet)->rowset_with_max_version()->end_version() + 1;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
k_engine->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id,
&tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
TabletPublishStatistics stats;
st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
(*tablet)->tablet_id(), (*tablet)->tablet_uid(),
version, &stats);
ASSERT_EQ(Status::OK(), st);
st = (*tablet)->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
}
}
void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_hash,
int64_t tablet_id, int64_t txn_id, int64_t partition_id, bool with_data = true) {
EXPECT_TRUE(io::global_local_filesystem()
->delete_directory(get_remote_path(remote_tablet_path(tablet_id)))
.ok());
EXPECT_TRUE(io::global_local_filesystem()
->create_directory(get_remote_path(remote_tablet_path(tablet_id)))
.ok());
// create tablet
std::unique_ptr<RuntimeProfile> profile;
profile = std::make_unique<RuntimeProfile>("CreateTablet");
TCreateTabletReq request;
create_tablet_request_with_sequence_col(tablet_id, schema_hash, &request);
request.__set_replica_id(replica_id);
Status st = k_engine->create_tablet(request, profile.get());
ASSERT_EQ(Status::OK(), st);
if (!with_data) {
*tablet = k_engine->tablet_manager()->get_tablet(tablet_id);
return;
}
TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
ObjectPool obj_pool;
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
write_rowset(tablet, std::move(load_id), replica_id, schema_hash, tablet_id, txn_id,
partition_id, tuple_desc);
EXPECT_EQ(1, (*tablet)->num_rows());
}
TEST_F(TabletCooldownTest, normal) {
TabletSharedPtr tablet1;
TabletSharedPtr tablet2;
createTablet(&tablet1, kReplicaId, kSchemaHash, kTabletId, kTxnId, kPartitionId);
// test cooldown
tablet1->set_storage_policy_id(kStoragePolicyId);
Status st = tablet1->cooldown(); // rowset [0-1]
ASSERT_NE(Status::OK(), st);
tablet1->update_cooldown_conf(1, kReplicaId);
// cooldown for upload node
st = tablet1->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
st = tablet1->cooldown(); // rowset [2-2]
ASSERT_EQ(Status::OK(), st);
auto rs = tablet1->get_rowset_by_version({2, 2});
ASSERT_FALSE(rs->is_local());
// test read
ASSERT_EQ(Status::OK(), st);
std::vector<segment_v2::SegmentSharedPtr> segments;
st = std::static_pointer_cast<BetaRowset>(rs)->load_segments(&segments);
ASSERT_EQ(Status::OK(), st);
ASSERT_EQ(segments.size(), 1);
}
TEST_F(TabletCooldownTest, cooldown_data) {
TabletSharedPtr tablet1;
createTablet(&tablet1, kReplicaId + 1, kSchemaHash + 1, kTabletId + 1, kTxnId + 1,
kPartitionId + 1, false);
// test cooldown
tablet1->set_storage_policy_id(kStoragePolicyId);
// Tablet with only rowset[0-1] will not be as suitable as cooldown candidate
ASSERT_FALSE(tablet1->_has_data_to_cooldown());
TabletSharedPtr tablet2;
createTablet(&tablet2, kReplicaId + 2, kSchemaHash + 2, kTabletId + 2, kTxnId + 2,
kPartitionId + 2);
// test cooldown
tablet2->set_storage_policy_id(kStoragePolicyId);
Status st = tablet2->cooldown(); // rowset [0-1]
ASSERT_NE(Status::OK(), st);
tablet2->update_cooldown_conf(1, kReplicaId + 2);
// cooldown for upload node
st = tablet2->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
st = tablet2->cooldown(); // rowset [2-2]
ASSERT_EQ(Status::OK(), st);
// Write one empty local rowset into tablet2 to test if this rowset would be uploaded or not
TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
ObjectPool obj_pool;
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_lo(1);
write_rowset(&tablet2, std::move(load_id), kReplicaId + 2, kSchemaHash + 2, kTabletId + 2,
kTxnId + 3, kPartitionId + 2, tuple_desc, false);
st = tablet2->cooldown(); // rowset [3-3]
ASSERT_EQ(Status::OK(), st);
}
} // namespace doris