blob: 6797782d5d665c399033b6f975ab3765148b4e2f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/util.h"
#include <glog/logging.h>
#include <cstdint>
#include "common/util.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service_schema.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
namespace doris::cloud {
namespace config {
extern int32_t recycle_job_lease_expired_ms;
} // namespace config
int get_all_instances(TxnKv* txn_kv, std::vector<InstanceInfoPB>& res) {
InstanceKeyInfo key0_info {""};
InstanceKeyInfo key1_info {"\xff"}; // instance id are human readable strings
std::string key0;
std::string key1;
instance_key(key0_info, &key0);
instance_key(key1_info, &key1);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(INFO) << "failed to init txn, err=" << err;
return -1;
}
std::unique_ptr<RangeGetIterator> it;
do {
TxnErrorCode err = txn->get(key0, key1, &it);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to get instance, err=" << err;
return -1;
}
while (it->has_next()) {
auto [k, v] = it->next();
if (!it->has_next()) key0 = k;
InstanceInfoPB instance_info;
if (!instance_info.ParseFromArray(v.data(), v.size())) {
LOG(WARNING) << "malformed instance info, key=" << hex(k);
return -1;
}
res.push_back(std::move(instance_info));
}
key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
return 0;
}
int prepare_instance_recycle_job(TxnKv* txn_kv, std::string_view key,
const std::string& instance_id, const std::string& ip_port,
int64_t interval_ms) {
std::string val;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn";
return -1;
}
err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key);
return -1;
}
using namespace std::chrono;
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
JobRecyclePB job_info;
auto is_expired = [&]() {
if (!job_info.ParseFromString(val)) {
LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key);
// if failed to parse, just recycle it.
return true;
}
DCHECK(job_info.instance_id() == instance_id);
if (job_info.status() == JobRecyclePB::BUSY) {
if (now < job_info.expiration_time_ms()) {
LOG(INFO) << "job is busy. host=" << job_info.ip_port()
<< " expiration=" << job_info.expiration_time_ms()
<< " instance=" << instance_id;
return false;
}
}
bool finish_expired = now - job_info.last_ctime_ms() > interval_ms;
if (!finish_expired) {
LOG(INFO) << "the time since last finished job is too short. host="
<< job_info.ip_port() << " ctime=" << job_info.last_ctime_ms()
<< " instance=" << instance_id;
}
return finish_expired;
};
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || is_expired()) {
job_info.set_status(JobRecyclePB::BUSY);
job_info.set_instance_id(instance_id);
job_info.set_ip_port(ip_port);
job_info.set_expiration_time_ms(now + config::recycle_job_lease_expired_ms);
val = job_info.SerializeAsString();
txn->put(key, val);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to commit, err=" << err << " key=" << hex(key);
return -1;
}
return 0;
}
return 1;
}
void finish_instance_recycle_job(TxnKv* txn_kv, std::string_view key,
const std::string& instance_id, const std::string& ip_port,
bool success, int64_t ctime_ms) {
std::string val;
int retry_times = 0;
do {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn";
return;
}
err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key);
return;
}
using namespace std::chrono;
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
JobRecyclePB job_info;
if (!job_info.ParseFromString(val)) {
LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key);
return;
}
DCHECK(job_info.instance_id() == instance_id);
if (job_info.ip_port() != ip_port) {
LOG(WARNING) << "job is doing at other machine: " << job_info.ip_port()
<< " key=" << hex(key);
return;
}
if (job_info.status() != JobRecyclePB::BUSY) {
LOG(WARNING) << "job is not busy, key=" << hex(key);
return;
}
job_info.set_status(JobRecyclePB::IDLE);
job_info.set_instance_id(instance_id);
job_info.set_last_finish_time_ms(now);
job_info.set_last_ctime_ms(ctime_ms);
if (success) {
job_info.set_last_success_time_ms(now);
}
val = job_info.SerializeAsString();
txn->put(key, val);
err = txn->commit();
if (err == TxnErrorCode::TXN_OK) {
LOG(INFO) << "succ to commit to finish recycle job, key=" << hex(key);
return;
}
// maybe conflict with the commit of the leased thread
LOG(WARNING) << "failed to commit to finish recycle job, err=" << err << " key=" << hex(key)
<< " retry_times=" << retry_times;
} while (retry_times++ < 3);
LOG(WARNING) << "finally failed to commit to finish recycle job, key=" << hex(key);
}
int lease_instance_recycle_job(TxnKv* txn_kv, std::string_view key, const std::string& instance_id,
const std::string& ip_port) {
std::string val;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn";
return -1;
}
err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key);
return -1;
}
using namespace std::chrono;
auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
JobRecyclePB job_info;
if (!job_info.ParseFromString(val)) {
LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key);
return 1;
}
DCHECK(job_info.instance_id() == instance_id);
if (job_info.ip_port() != ip_port) {
LOG(WARNING) << "job is doing at other machine: " << job_info.ip_port()
<< " key=" << hex(key);
return 1;
}
if (job_info.status() != JobRecyclePB::BUSY) {
LOG(WARNING) << "job is not busy, key=" << hex(key);
return 1;
}
job_info.set_expiration_time_ms(now + config::recycle_job_lease_expired_ms);
val = job_info.SerializeAsString();
txn->put(key, val);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to commit, failed to lease recycle job, err=" << err
<< " key=" << hex(key);
return -1;
}
return 0;
}
int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id,
TabletIndexPB& tablet_idx) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn";
return -1;
}
std::string key, val;
meta_tablet_idx_key({instance_id, tablet_id}, &key);
err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << fmt::format("failed to get tablet_idx, err={} tablet_id={} key={}", err,
tablet_id, hex(key));
return -1;
}
if (!tablet_idx.ParseFromString(val)) [[unlikely]] {
LOG(WARNING) << fmt::format("malformed tablet index value, tablet_id={} key={}", tablet_id,
hex(key));
return -1;
}
if (tablet_id != tablet_idx.tablet_id()) [[unlikely]] {
LOG(WARNING) << "unexpected error given_tablet_id=" << tablet_id
<< " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key);
return -1;
}
return 0;
}
int get_tablet_meta(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id,
TabletMetaCloudPB& tablet_meta) {
TabletIndexPB tablet_idx;
int ret = get_tablet_idx(txn_kv, instance_id, tablet_id, tablet_idx);
if (ret < 0) {
return ret;
}
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn";
return -1;
}
std::string key, val;
meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id},
&key);
err = txn->get(key, &val);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << fmt::format(
"failed to get tablet, err={}, table_id={}, index_id={}, partition_id={}, "
"tablet_id={} key={}",
err, tablet_idx.table_id(), tablet_idx.index_id(), tablet_idx.partition_id(),
tablet_id, hex(key));
return -1;
}
if (!tablet_meta.ParseFromString(val)) [[unlikely]] {
LOG(WARNING) << fmt::format("malformed tablet meta, tablet_id={} key={}", tablet_id,
hex(key));
return -1;
}
return 0;
}
} // namespace doris::cloud