blob: 811fc219b99e8e78853b668932314f4789e621dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
#include <chrono>
#include <memory>
#include "common/defer.h"
#include "common/logging.h"
#include "common/stats.h"
#include "meta-service/meta_service_helper.h"
#include "meta-store/blob_message.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
#include "meta_service.h"
namespace doris::cloud {
using check_create_table_type = std::function<const std::tuple<
const ::google::protobuf::RepeatedField<int64_t>, std::string,
std::function<std::string(std::string, int64_t)>>(const CheckKVRequest* request)>;
// ATTN: xxx_id MUST NOT be reused
//
// UNKNOWN
// |
// +----------+---------+
// | |
// (prepare_xxx) (drop_xxx)
// | |
// v v
// PREPARED--(drop_xxx)-->DROPPED
// | |
// |----------+---------+
// | |
// | (begin_recycle_xxx)
// | |
// (commit_xxx) v
// | RECYCLING RECYCLING --(drop_xxx)-> RECYCLING
// | |
// | (finish_recycle_xxx) UNKNOWN --(commit_xxx)-> UNKNOWN
// | | if xxx exists
// +----------+
// |
// v
// UNKNOWN
// Return TXN_OK if exists, TXN_KEY_NOT_FOUND if not exists, otherwise error
static TxnErrorCode index_exists(Transaction* txn, const std::string& instance_id,
const IndexRequest* req) {
auto tablet_key = meta_tablet_key({instance_id, req->table_id(), req->index_ids(0), 0, 0});
auto tablet_key_end =
meta_tablet_key({instance_id, req->table_id(), req->index_ids(0), INT64_MAX, 0});
std::unique_ptr<RangeGetIterator> it;
TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to get kv").tag("err", err);
return err;
}
return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND;
}
static TxnErrorCode check_recycle_key_exist(Transaction* txn, const std::string& key) {
std::string val;
return txn->get(key, &val);
}
void MetaServiceImpl::prepare_index(::google::protobuf::RpcController* controller,
const IndexRequest* request, IndexResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(prepare_index, get, put);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
AnnotateTag tag_instance_id("instance_id", instance_id);
RPC_RATE_LIMIT(prepare_index)
if (request->index_ids().empty() || !request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty index_ids or table_id";
return;
}
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = "failed to create txn";
return;
}
bool is_versioned_read = is_version_read_enabled(instance_id);
CloneChainReader reader(instance_id, resource_mgr_.get());
if (!is_versioned_read) {
err = index_exists(txn.get(), instance_id, request);
} else {
err = reader.is_index_exists(txn.get(), request->index_ids(0));
}
// If index has existed, this might be a stale request
if (err == TxnErrorCode::TXN_OK) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = "index already existed";
return;
}
if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to check index existence, err={}", err);
return;
}
std::string to_save_val;
{
RecycleIndexPB pb;
pb.set_table_id(request->table_id());
pb.set_creation_time(::time(nullptr));
pb.set_expiration(request->expiration());
pb.set_state(RecycleIndexPB::PREPARED);
pb.SerializeToString(&to_save_val);
}
for (auto index_id : request->index_ids()) {
auto key = recycle_index_key({instance_id, index_id});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
LOG_INFO("put recycle index").tag("key", hex(key));
txn->put(key, to_save_val);
continue;
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get kv, err={}", err);
LOG_WARNING(msg);
return;
}
RecycleIndexPB pb;
if (!pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed recycle index value";
LOG_WARNING(msg).tag("index_id", index_id);
return;
}
if (pb.state() != RecycleIndexPB::PREPARED) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("invalid recycle index state: {}",
RecycleIndexPB::State_Name(pb.state()));
return;
}
// else, duplicate request, OK
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit txn: {}", err);
return;
}
}
void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller,
const IndexRequest* request, IndexResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(commit_index, get, put, del);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
RPC_RATE_LIMIT(commit_index)
if (request->index_ids().empty() || !request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty index_ids or table_id";
return;
}
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
CommitIndexLogPB commit_index_log;
commit_index_log.set_db_id(request->db_id());
commit_index_log.set_table_id(request->table_id());
bool is_versioned_read = is_version_read_enabled(instance_id);
bool is_versioned_write = is_version_write_enabled(instance_id);
if (!request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "db_id is required for versioned write, please upgrade your FE version";
return;
}
CloneChainReader reader(instance_id, resource_mgr_.get());
for (auto index_id : request->index_ids()) {
auto key = recycle_index_key({instance_id, index_id});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
if (!is_versioned_read) {
err = index_exists(txn.get(), instance_id, request);
} else {
err = reader.is_index_exists(txn.get(), index_id);
}
// If index has existed, this might be a duplicate request
if (err == TxnErrorCode::TXN_OK) {
return; // Index committed, OK
}
if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = "failed to check index existence";
return;
}
// Index recycled
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "index has been recycled";
return;
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get kv, err={}", err);
LOG_WARNING(msg);
return;
}
RecycleIndexPB pb;
if (!pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed recycle index value";
LOG_WARNING(msg).tag("index_id", index_id);
return;
}
if (pb.state() != RecycleIndexPB::PREPARED) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("invalid recycle index state: {}",
RecycleIndexPB::State_Name(pb.state()));
return;
}
LOG_INFO("remove recycle index").tag("key", hex(key));
txn->remove(key);
// Save the index meta/index keys
if (is_versioned_write) {
int64_t db_id = request->db_id();
int64_t table_id = request->table_id();
std::string index_meta_key = versioned::meta_index_key({instance_id, index_id});
std::string index_index_key = versioned::index_index_key({instance_id, index_id});
std::string index_inverted_key =
versioned::index_inverted_key({instance_id, db_id, table_id, index_id});
IndexIndexPB index_index_pb;
index_index_pb.set_db_id(db_id);
index_index_pb.set_table_id(table_id);
LOG(INFO) << index_index_pb.DebugString();
std::string index_index_value;
if (!index_index_pb.SerializeToString(&index_index_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = fmt::format("failed to serialize IndexIndexPB");
LOG_WARNING(msg).tag("index_id", index_id);
return;
}
versioned_put(txn.get(), index_meta_key, "");
txn->put(index_inverted_key, "");
txn->put(index_index_key, index_index_value);
commit_index_log.add_index_ids(index_id);
}
}
// Save the partition meta keys
if (is_versioned_write) {
for (auto partition_id : request->partition_ids()) {
int64_t db_id = request->db_id();
int64_t table_id = request->table_id();
std::string part_meta_key = versioned::meta_partition_key({instance_id, partition_id});
std::string part_index_key =
versioned::partition_index_key({instance_id, partition_id});
std::string part_inverted_index_key = versioned::partition_inverted_index_key(
{instance_id, db_id, table_id, partition_id});
PartitionIndexPB part_index_pb;
part_index_pb.set_db_id(db_id);
part_index_pb.set_table_id(table_id);
std::string part_index_value;
if (!part_index_pb.SerializeToString(&part_index_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = fmt::format("failed to serialize PartitionIndexPB");
LOG_WARNING(msg).tag("part_id", partition_id);
return;
}
versioned_put(txn.get(), part_meta_key, "");
txn->put(part_inverted_index_key, "");
txn->put(part_index_key, part_index_value);
LOG(INFO) << "xxx put versioned partition index key=" << hex(part_index_key)
<< " partition_id=" << partition_id;
commit_index_log.add_partition_ids(partition_id);
}
}
if (request->has_is_new_table() && request->is_new_table()) {
if (is_versioned_read) {
// Read the table version, to build the operation log visible version range.
Versionstamp table_version;
err = reader.get_table_version(txn.get(), request->table_id(), &table_version, true);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get table version, err={}", err);
return;
}
}
// init table version, for create and truncate table
update_table_version(txn.get(), instance_id, request->db_id(), request->table_id());
commit_index_log.set_update_table_version(true);
}
if (commit_index_log.index_ids_size() > 0 && is_version_write_enabled(instance_id)) {
std::string operation_log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_commit_index()->Swap(&commit_index_log);
versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG_INFO("put commit index operation log key")
.tag("instance_id", instance_id)
.tag("table_id", request->table_id())
.tag("operation_log_key", hex(operation_log_key));
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit txn: {}", err);
return;
}
}
void MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
const IndexRequest* request, IndexResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(drop_index, get, put);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
RPC_RATE_LIMIT(drop_index)
if (request->index_ids().empty() || !request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty index_ids or table_id";
return;
}
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
std::string to_save_val;
{
RecycleIndexPB pb;
pb.set_db_id(request->db_id());
pb.set_table_id(request->table_id());
pb.set_creation_time(::time(nullptr));
pb.set_expiration(request->expiration());
pb.set_state(RecycleIndexPB::DROPPED);
pb.SerializeToString(&to_save_val);
}
bool need_commit = false;
bool is_versioned_write = is_version_write_enabled(instance_id);
bool is_versioned_read = is_version_read_enabled(instance_id);
if (is_versioned_write && !request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db_id for versioned write, please upgrade your FE version";
return;
}
DropIndexLogPB drop_index_log;
drop_index_log.set_db_id(request->db_id());
drop_index_log.set_table_id(request->table_id());
drop_index_log.set_expiration(request->expiration());
CloneChainReader reader(instance_id, resource_mgr_.get());
for (auto index_id : request->index_ids()) {
auto key = recycle_index_key({instance_id, index_id});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
if (is_versioned_write) {
drop_index_log.add_index_ids(index_id);
if (is_versioned_read) {
// Read the index version, to build the operation log visible version range.
err = reader.is_index_exists(txn.get(), index_id);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to check index existence, err={}", err);
LOG_WARNING(msg);
return;
}
}
} else {
LOG_INFO("put recycle index").tag("key", hex(key));
txn->put(key, to_save_val);
}
need_commit = true;
continue;
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get kv, err={}", err);
LOG_WARNING(msg);
return;
}
RecycleIndexPB pb;
if (!pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed recycle index value";
LOG_WARNING(msg).tag("index_id", index_id);
return;
}
switch (pb.state()) {
case RecycleIndexPB::PREPARED:
LOG_INFO("put recycle index").tag("key", hex(key));
txn->put(key, to_save_val);
need_commit = true;
break;
case RecycleIndexPB::DROPPED:
case RecycleIndexPB::RECYCLING:
break;
default:
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("invalid recycle index state: {}",
RecycleIndexPB::State_Name(pb.state()));
return;
}
}
if (!need_commit) return;
if (drop_index_log.index_ids_size() > 0 && is_versioned_write) {
std::string operation_log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_drop_index()->Swap(&drop_index_log);
versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG(INFO) << "put drop index operation log"
<< " instance_id=" << instance_id << " table_id=" << request->table_id()
<< " index_ids=" << drop_index_log.index_ids_size();
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit txn: {}", err);
return;
}
}
// Return TXN_OK if exists, TXN_KEY_NOT_FOUND if not exists, otherwise error
static TxnErrorCode partition_exists(Transaction* txn, const std::string& instance_id,
const PartitionRequest* req) {
auto tablet_key = meta_tablet_key(
{instance_id, req->table_id(), req->index_ids(0), req->partition_ids(0), 0});
auto tablet_key_end = meta_tablet_key(
{instance_id, req->table_id(), req->index_ids(0), req->partition_ids(0), INT64_MAX});
std::unique_ptr<RangeGetIterator> it;
TxnErrorCode err = txn->get(tablet_key, tablet_key_end, &it, false, 1);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to get kv").tag("err", err);
return err;
}
return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND;
}
void MetaServiceImpl::prepare_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request,
PartitionResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(prepare_partition, get, put);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
AnnotateTag tag_instance_id("instance_id", instance_id);
RPC_RATE_LIMIT(prepare_partition)
if (request->partition_ids().empty() || request->index_ids().empty() ||
!request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty partition_ids or index_ids or table_id";
return;
}
if (request->partition_versions_size() > 0 &&
(request->partition_versions_size() != request->partition_ids_size())) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "size is not equal, partition_versions size=" +
std::to_string(request->partition_ids_size()) +
" partition_ids size=" + std::to_string(request->partition_ids_size());
return;
}
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
bool is_versioned_read = is_version_read_enabled(instance_id);
if (!is_versioned_read) {
err = partition_exists(txn.get(), instance_id, request);
} else {
CloneChainReader reader(instance_id, resource_mgr_.get());
err = reader.is_partition_exists(txn.get(), request->partition_ids(0));
}
// If index has existed, this might be a stale request
if (err == TxnErrorCode::TXN_OK) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = "index already existed";
return;
}
if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to check index existence, err={}", err);
return;
}
std::string to_save_val;
{
RecyclePartitionPB pb;
if (request->db_id() > 0) pb.set_db_id(request->db_id());
pb.set_table_id(request->table_id());
*pb.mutable_index_id() = request->index_ids();
pb.set_creation_time(::time(nullptr));
pb.set_expiration(request->expiration());
pb.set_state(RecyclePartitionPB::PREPARED);
pb.SerializeToString(&to_save_val);
}
for (int i = 0; i < request->partition_ids_size(); i++) {
auto key = recycle_partition_key({instance_id, request->partition_ids(i)});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
LOG_INFO("put recycle partition").tag("key", hex(key));
txn->put(key, to_save_val);
// save partition version
if (request->partition_versions_size() > 0 && request->partition_versions(i) > 1) {
int64_t version_update_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
std::string ver_key;
std::string ver_val;
partition_version_key({instance_id, request->db_id(), request->table_id(),
request->partition_ids(i)},
&ver_key);
VersionPB version_pb;
version_pb.set_version(request->partition_versions(i));
version_pb.set_update_time_ms(version_update_time_ms);
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving.";
msg = ss.str();
return;
}
txn->put(ver_key, ver_val);
LOG_INFO("put partition_version_key")
.tag("key", hex(ver_key))
.tag("partition_id", request->partition_ids(i))
.tag("version", request->partition_versions(i));
}
continue;
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get kv, err={}", err);
LOG_WARNING(msg);
return;
}
RecyclePartitionPB pb;
if (!pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed recycle partition value";
LOG_WARNING(msg).tag("partition_id", request->partition_ids(i));
return;
}
if (pb.state() != RecyclePartitionPB::PREPARED) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("invalid recycle index state: {}",
RecyclePartitionPB::State_Name(pb.state()));
return;
}
// else, duplicate request, OK
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit txn: {}", err);
return;
}
}
void MetaServiceImpl::commit_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(commit_partition, get, put, del);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
RPC_RATE_LIMIT(commit_partition)
if (request->partition_ids().empty() || !request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty partition_ids or index_ids or table_id";
return;
}
constexpr size_t BATCH_COMMIT_SIZE = 1000;
for (size_t i = 0; i < request->partition_ids_size(); i += BATCH_COMMIT_SIZE) {
std::vector<int64_t> partition_ids;
size_t end = std::min(i + BATCH_COMMIT_SIZE, (size_t)request->partition_ids_size());
for (size_t j = i; j < end; j++) {
partition_ids.push_back(request->partition_ids(j));
}
commit_partition_internal(request, instance_id, partition_ids, code, msg, stats);
if (code != MetaServiceCode::OK) {
return;
}
}
}
void MetaServiceImpl::commit_partition_internal(const PartitionRequest* request,
const std::string& instance_id,
const std::vector<int64_t>& partition_ids,
MetaServiceCode& code, std::string& msg,
KVStats& stats) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
DORIS_CLOUD_DEFER {
if (txn) {
stats.get_bytes += txn->get_bytes();
stats.put_bytes += txn->put_bytes();
stats.del_bytes += txn->delete_bytes();
stats.get_counter += txn->num_get_keys();
stats.put_counter += txn->num_put_keys();
stats.del_counter += txn->num_del_keys();
}
};
CommitPartitionLogPB commit_partition_log;
commit_partition_log.set_db_id(request->db_id());
commit_partition_log.set_table_id(request->table_id());
commit_partition_log.mutable_index_ids()->CopyFrom(request->index_ids());
bool is_versioned_read = is_version_read_enabled(instance_id);
bool is_versioned_write = is_version_write_enabled(instance_id);
if (is_versioned_write && !request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db_id for versioned write, please upgrade your FE version";
return;
}
CloneChainReader reader(instance_id, resource_mgr_.get());
size_t num_commit = 0;
for (auto part_id : partition_ids) {
auto key = recycle_partition_key({instance_id, part_id});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
// Compatible with requests without `index_ids`
if (!request->index_ids().empty()) {
if (!is_versioned_read) {
err = partition_exists(txn.get(), instance_id, request);
} else {
err = reader.is_partition_exists(txn.get(), part_id);
}
// If partition has existed, this might be a duplicate request
if (err == TxnErrorCode::TXN_OK) {
// Partition committed, OK
continue;
}
if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to check partition existence, err={}", err);
return;
}
}
// Index recycled
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "partition has been recycled";
return;
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get kv, err={}", err);
LOG_WARNING(msg);
return;
}
RecyclePartitionPB pb;
if (!pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed recycle partition value";
LOG_WARNING(msg).tag("partition_id", part_id);
return;
}
if (pb.state() != RecyclePartitionPB::PREPARED) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("invalid recycle partition state: {}",
RecyclePartitionPB::State_Name(pb.state()));
return;
}
LOG_INFO("remove recycle partition").tag("key", hex(key));
txn->remove(key);
num_commit += 1;
// Save the partition meta/index keys
if (is_versioned_write) {
int64_t db_id = request->db_id();
int64_t table_id = request->table_id();
std::string part_meta_key = versioned::meta_partition_key({instance_id, part_id});
std::string part_index_key = versioned::partition_index_key({instance_id, part_id});
std::string part_inverted_index_key = versioned::partition_inverted_index_key(
{instance_id, db_id, table_id, part_id});
PartitionIndexPB part_index_pb;
part_index_pb.set_db_id(db_id);
part_index_pb.set_table_id(table_id);
LOG(INFO) << part_index_pb.DebugString();
std::string part_index_value;
if (!part_index_pb.SerializeToString(&part_index_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = fmt::format("failed to serialize PartitionIndexPB");
LOG_WARNING(msg).tag("part_id", part_id);
return;
}
versioned_put(txn.get(), part_meta_key, "");
txn->put(part_inverted_index_key, "");
txn->put(part_index_key, part_index_value);
commit_partition_log.add_partition_ids(part_id);
}
}
if (num_commit == 0) {
// All partitions have been committed, OK
return;
}
// update table versions
if (is_versioned_read) {
// Read the table version, to build the operation log visible version range.
Versionstamp table_version;
err = reader.get_table_version(txn.get(), request->table_id(), &table_version, true);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get table version, err={}", err);
return;
}
}
update_table_version(txn.get(), instance_id, request->db_id(), request->table_id());
if (commit_partition_log.partition_ids_size() > 0 && is_version_write_enabled(instance_id)) {
std::string operation_log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_commit_partition()->Swap(&commit_partition_log);
versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG_INFO("put commit partition operation log key")
.tag("instance_id", instance_id)
.tag("table_id", request->table_id())
.tag("num_partitions", operation_log.commit_partition().partition_ids_size())
.tag("operation_log_key", hex(operation_log_key));
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit txn: {}", err);
return;
}
}
void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(drop_partition, get, put);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
RPC_RATE_LIMIT(drop_partition)
if (request->partition_ids().empty() || request->index_ids().empty() ||
!request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty partition_ids or index_ids or table_id";
return;
}
if (!request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db_id for drop_partition";
return;
}
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = "failed to create txn";
return;
}
std::string to_save_val;
{
RecyclePartitionPB pb;
if (request->db_id() > 0) pb.set_db_id(request->db_id());
pb.set_table_id(request->table_id());
*pb.mutable_index_id() = request->index_ids();
pb.set_creation_time(::time(nullptr));
pb.set_expiration(request->expiration());
pb.set_state(RecyclePartitionPB::DROPPED);
pb.SerializeToString(&to_save_val);
}
bool need_commit = false;
bool is_versioned_write = is_version_write_enabled(instance_id);
bool is_versioned_read = is_version_read_enabled(instance_id);
DropPartitionLogPB drop_partition_log;
drop_partition_log.set_db_id(request->db_id());
drop_partition_log.set_table_id(request->table_id());
drop_partition_log.mutable_index_ids()->CopyFrom(request->index_ids());
drop_partition_log.set_expired_at_s(request->expiration());
CloneChainReader reader(instance_id, resource_mgr_.get());
for (auto part_id : request->partition_ids()) {
auto key = recycle_partition_key({instance_id, part_id});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
if (is_versioned_write) {
drop_partition_log.add_partition_ids(part_id);
if (is_versioned_read) {
// Read the partition version, to build the operation log visible version range.
err = reader.is_partition_exists(txn.get(), part_id);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to check partition existence, err={}", err);
LOG_WARNING(msg);
return;
}
}
} else {
LOG_INFO("put recycle partition").tag("key", hex(key));
txn->put(key, to_save_val);
}
need_commit = true;
continue;
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get kv, err={}", err);
LOG_WARNING(msg);
return;
}
RecyclePartitionPB pb;
if (!pb.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed recycle partition value";
LOG_WARNING(msg).tag("partition_id", part_id);
return;
}
switch (pb.state()) {
case RecyclePartitionPB::PREPARED:
LOG_INFO("put recycle partition").tag("key", hex(key));
txn->put(key, to_save_val);
need_commit = true;
break;
case RecyclePartitionPB::DROPPED:
case RecyclePartitionPB::RECYCLING:
break;
default:
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("invalid recycle partition state: {}",
RecyclePartitionPB::State_Name(pb.state()));
return;
}
}
if (!need_commit) return;
// Update table version only when deleting non-empty partitions
if (request->has_need_update_table_version() && request->need_update_table_version()) {
if (is_versioned_read) {
// Read the table version, to build the operation log visible version range.
Versionstamp table_version;
err = reader.get_table_version(txn.get(), request->table_id(), &table_version, true);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get table version, err={}", err);
return;
}
}
update_table_version(txn.get(), instance_id, request->db_id(), request->table_id());
drop_partition_log.set_update_table_version(true);
}
if ((drop_partition_log.update_table_version() ||
drop_partition_log.partition_ids_size() > 0) &&
is_versioned_write) {
std::string operation_log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_drop_partition()->Swap(&drop_partition_log);
versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG(INFO) << "put drop partition operation log"
<< " instance_id=" << instance_id << " table_id=" << request->table_id()
<< " partition_ids=" << drop_partition_log.partition_ids_size();
}
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit txn: {}", err);
return;
}
}
void check_create_table(std::string instance_id, std::shared_ptr<TxnKv> txn_kv,
const CheckKVRequest* request, CheckKVResponse* response,
MetaServiceCode* code, std::string* msg, KVStats& stats,
check_create_table_type get_check_info) {
StopWatch watch;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
*code = cast_as<ErrCategory::READ>(err);
*msg = "failed to create txn";
return;
}
DORIS_CLOUD_DEFER {
if (txn == nullptr) return;
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
};
auto& [keys, hint, key_func] = get_check_info(request);
if (keys.empty()) {
*code = MetaServiceCode::INVALID_ARGUMENT;
*msg = "empty keys";
return;
}
for (int i = 0; i < keys.size();) {
auto key = key_func(instance_id, keys.Get(i));
err = check_recycle_key_exist(txn.get(), key);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
i++;
continue;
} else if (err == TxnErrorCode::TXN_OK) {
// find not match, prepare commit
*code = MetaServiceCode::UNDEFINED_ERR;
*msg = "prepare and commit rpc not match, recycle key remained";
return;
} else if (err == TxnErrorCode::TXN_TOO_OLD) {
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
// separate it to several txn for rubustness
txn.reset();
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
*code = cast_as<ErrCategory::READ>(err);
*msg = "failed to create txn in cycle";
return;
}
LOG_INFO("meet txn too long err, gen a new txn, and retry, size={} idx={}", keys.size(),
i);
bthread_usleep(50);
continue;
} else {
// err != TXN_OK, fdb read err
*code = cast_as<ErrCategory::READ>(err);
*msg = fmt::format("ms read key error: {}", err);
return;
}
}
LOG_INFO("check {} success key.size={}, cost(us)={}", hint, keys.size(), watch.elapsed_us());
}
void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller,
const CheckKVRequest* request, CheckKVResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(check_kv, get);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
if (!request->has_op()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "op not given";
return;
}
if (!request->has_check_keys()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty check keys";
return;
}
RPC_RATE_LIMIT(check_kv);
switch (request->op()) {
case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: {
check_create_table(instance_id, txn_kv_, request, response, &code, &msg, stats,
[](const CheckKVRequest* request) {
return std::make_tuple(
request->check_keys().index_ids(), "index",
[](std::string instance_id, int64_t id) {
return recycle_index_key({std::move(instance_id), id});
});
});
break;
}
case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: {
check_create_table(
instance_id, txn_kv_, request, response, &code, &msg, stats,
[](const CheckKVRequest* request) {
return std::make_tuple(
request->check_keys().partition_ids(), "partition",
[](std::string instance_id, int64_t id) {
return recycle_partition_key({std::move(instance_id), id});
});
});
break;
}
default:
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "not support op";
return;
};
}
} // namespace doris::cloud