blob: e51bdc524782cbeda9469c0597f2282d1a87e0c4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/snapshot_data_migrator.h"
#include <gen_cpp/cloud.pb.h>
#include "common/config.h"
#include "common/defer.h"
#include "common/stopwatch.h"
#include "common/util.h"
#include "meta-store/keys.h"
#include "mock_accessor.h"
#include "recycler/hdfs_accessor.h"
#include "recycler/s3_accessor.h"
#include "recycler/util.h"
namespace doris::cloud {
SnapshotDataMigrator::SnapshotDataMigrator(std::shared_ptr<TxnKv> txn_kv)
: txn_kv_(std::move(txn_kv)) {}
SnapshotDataMigrator::~SnapshotDataMigrator() {
if (!stopped()) {
stop();
}
}
int SnapshotDataMigrator::start() {
workers_.emplace_back([this]() { scan_instance_loop(); });
workers_.emplace_back([this] { lease_migration_jobs(); });
for (int i = 0; i < config::snapshot_data_migrator_concurrent; ++i) {
workers_.emplace_back([this]() { migration_loop(); });
}
LOG_INFO("snapshot data migrator started")
.tag("concurrent", config::snapshot_data_migrator_concurrent);
return 0;
}
void SnapshotDataMigrator::stop() {
stopped_ = true;
notifier_.notify_all();
pending_instance_cond_.notify_all();
{
std::lock_guard lock(mtx_);
for (auto& [_, migrator] : migrating_instance_map_) {
migrator->stop();
}
}
for (auto& w : workers_) {
if (w.joinable()) w.join();
}
}
void SnapshotDataMigrator::migration_loop() {
pthread_setname_np(pthread_self(), "SNAP_MIGRATOR");
while (!stopped()) {
// fetch instance to check
InstanceInfoPB instance;
{
std::unique_lock lock(mtx_);
pending_instance_cond_.wait(
lock, [&]() -> bool { return !pending_instance_queue_.empty() || stopped(); });
if (stopped()) {
return;
}
instance = std::move(pending_instance_queue_.front());
pending_instance_queue_.pop_front();
pending_instance_set_.erase(instance.instance_id());
}
const auto& instance_id = instance.instance_id();
{
std::lock_guard lock(mtx_);
// skip instance in recycling
if (migrating_instance_map_.count(instance_id)) continue;
}
auto migrator = std::make_shared<InstanceDataMigrator>(txn_kv_, instance);
if (migrator->init() != 0) {
LOG(WARNING) << "failed to init instance migrator, instance_id="
<< instance.instance_id();
continue;
}
std::string job_key = job_snapshot_data_migrator_key(instance.instance_id());
int ret =
prepare_instance_recycle_job(txn_kv_.get(), job_key, instance.instance_id(),
ip_port_, config::recycle_job_lease_expired_ms * 1000);
if (ret != 0) { // Prepare failed
continue;
} else {
std::lock_guard lock(mtx_);
migrating_instance_map_.emplace(instance_id, migrator);
}
if (stopped()) return;
migrator->do_migrate();
{
std::lock_guard lock(mtx_);
migrating_instance_map_.erase(instance.instance_id());
}
}
}
void SnapshotDataMigrator::scan_instance_loop() {
std::this_thread::sleep_for(
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
while (!stopped()) {
std::vector<InstanceInfoPB> instances;
get_all_instances(txn_kv_.get(), instances);
LOG(INFO) << "Snapshot data migrator get instances: " << [&instances] {
std::stringstream ss;
for (auto& i : instances) ss << ' ' << i.instance_id();
return ss.str();
}();
if (!instances.empty()) {
// enqueue instances
std::lock_guard lock(mtx_);
for (auto& instance : instances) {
if (instance.status() == InstanceInfoPB::DELETED) continue;
if (!is_instance_need_migrate(instance)) continue;
auto [_, success] = pending_instance_set_.insert(instance.instance_id());
// skip instance already in pending queue
if (success) {
pending_instance_queue_.push_back(std::move(instance));
}
}
pending_instance_cond_.notify_all();
}
{
std::unique_lock lock(mtx_);
notifier_.wait_for(lock, std::chrono::seconds(config::scan_instances_interval_seconds),
[&]() { return stopped(); });
}
}
}
void SnapshotDataMigrator::lease_migration_jobs() {
while (!stopped()) {
std::vector<std::string> instances;
instances.reserve(migrating_instance_map_.size());
{
std::lock_guard lock(mtx_);
for (auto& [id, _] : migrating_instance_map_) {
instances.push_back(id);
}
}
for (auto& i : instances) {
std::string job_key = job_snapshot_data_migrator_key(i);
int ret = lease_instance_recycle_job(txn_kv_.get(), job_key, i, ip_port_);
if (ret == 1) {
std::lock_guard lock(mtx_);
if (auto it = migrating_instance_map_.find(i);
it != migrating_instance_map_.end()) {
it->second->stop();
}
}
}
{
std::unique_lock lock(mtx_);
notifier_.wait_for(lock,
std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
[&]() { return stopped(); });
}
}
}
bool SnapshotDataMigrator::is_instance_need_migrate(const InstanceInfoPB& instance_info) {
// The multi-version feature is enabled, but the snapshot feature is not ready.
return instance_info.multi_version_status() != MultiVersionStatus::MULTI_VERSION_DISABLED &&
instance_info.snapshot_switch_status() == SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED;
}
InstanceDataMigrator::InstanceDataMigrator(std::shared_ptr<TxnKv> txn_kv,
const InstanceInfoPB& instance)
: txn_kv_(std::move(txn_kv)),
instance_id_(instance.instance_id()),
instance_info_(instance) {}
InstanceDataMigrator::~InstanceDataMigrator() {
if (!stopped()) {
stop();
}
}
int InstanceDataMigrator::init() {
int ret = init_obj_store_accessors();
if (ret != 0) {
return ret;
}
return init_storage_vault_accessors();
}
int InstanceDataMigrator::init_obj_store_accessors() {
for (const auto& obj_info : instance_info_.obj_info()) {
#ifdef UNIT_TEST
auto accessor = std::make_shared<MockAccessor>();
#else
auto s3_conf = S3Conf::from_obj_store_info(obj_info);
if (!s3_conf) {
LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
return -1;
}
std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
if (ret != 0) {
LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_
<< " resource_id=" << obj_info.id();
return ret;
}
#endif
accessor_map_.emplace(obj_info.id(), std::move(accessor));
}
return 0;
}
int InstanceDataMigrator::init_storage_vault_accessors() {
if (instance_info_.resource_ids().empty()) {
return 0;
}
FullRangeGetOptions opts(txn_kv_);
opts.prefetch = true;
auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
storage_vault_key({instance_id_, "\xff"}), std::move(opts));
for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
auto [k, v] = *kv;
StorageVaultPB vault;
if (!vault.ParseFromArray(v.data(), v.size())) {
LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
return -1;
}
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
&accessor_map_, &vault);
if (vault.has_hdfs_info()) {
#ifdef ENABLE_HDFS_STORAGE_VAULT
auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
int ret = accessor->init();
if (ret != 0) {
LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
<< " resource_id=" << vault.id() << " name=" << vault.name();
return ret;
}
accessor_map_.emplace(vault.id(), std::move(accessor));
#else
LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), "
<< "but HDFS storage vaults were detected";
#endif
} else if (vault.has_obj_info()) {
#ifdef UNIT_TEST
auto accessor = std::make_shared<MockAccessor>();
#else
auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
if (!s3_conf) {
LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
return -1;
}
std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
if (ret != 0) {
LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
<< " resource_id=" << vault.id() << " name=" << vault.name();
return ret;
}
#endif
accessor_map_.emplace(vault.id(), std::move(accessor));
}
}
if (!it->is_valid()) {
LOG_WARNING("failed to get storage vault kv");
return -1;
}
return 0;
}
int InstanceDataMigrator::do_migrate() {
LOG_WARNING("start data migration").tag("instance_id", instance_id_);
StopWatch stop_watch;
DORIS_CLOUD_DEFER {
LOG_WARNING("data migration done")
.tag("instance_id", instance_id_)
.tag("cost(sec)", stop_watch.elapsed_seconds());
};
SnapshotManager snapshot_mgr(txn_kv_);
int res = snapshot_mgr.migrate_to_versioned_keys(this);
if (res != 0) {
LOG_WARNING("failed to migrate snapshot keys").tag("instance_id", instance_id_);
return res;
}
return enable_instance_snapshot_switch();
}
int InstanceDataMigrator::enable_instance_snapshot_switch() {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to create txn for data migration").tag("instance_id", instance_id_);
return -1;
}
std::string key = instance_key(instance_id_);
std::string instance_value;
err = txn->get(key, &instance_value);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to get instance info in data migration")
.tag("instance_id", instance_id_)
.tag("error", err);
return -1;
}
InstanceInfoPB instance_info;
if (!instance_info.ParseFromString(instance_value)) {
LOG_WARNING("failed to parse instance info in data migration")
.tag("instance_id", instance_id_);
return -1;
}
if (instance_info.multi_version_status() == MultiVersionStatus::MULTI_VERSION_DISABLED) {
LOG_WARNING("instance multi version status is disabled, no need to enable snapshot switch")
.tag("instance_id", instance_id_);
return 0;
}
instance_info.set_snapshot_switch_status(SnapshotSwitchStatus::SNAPSHOT_SWITCH_OFF);
instance_info.clear_migrated_key_sets();
txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, instance_info.SerializeAsString());
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to commit instance info in data migration")
.tag("instance_id", instance_id_)
.tag("error", err);
return -1;
}
LOG_WARNING("finish data migration").tag("instance_id", instance_id_);
return 0;
}
} // namespace doris::cloud