blob: 2494e91c241e248503c71d25364e741393f93923 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/closure_guard.h>
#include <brpc/controller.h>
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <sstream>
#include "common/bvars.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/stats.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/blob_message.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
#include "meta_service.h"
#include "resource-manager/resource_manager.h"
// Empty string not is not processed
template <typename T, size_t S>
static inline constexpr size_t get_file_name_offset(const T (&s)[S], size_t i = S - 1) {
return (s[i] == '/' || s[i] == '\\') ? i + 1 : (i > 0 ? get_file_name_offset(s, i - 1) : 0);
}
#define SS (ss << &__FILE__[get_file_name_offset(__FILE__)] << ":" << __LINE__ << " ")
#define INSTANCE_LOG(severity) (LOG(severity) << '(' << instance_id << ')')
namespace doris::cloud {
// check compaction input_versions are valid during schema change.
// If the schema change job doesnt have alter version, it dont need to check
// because the schema change job is come from old version BE.
// we will check they in prepare compaction and commit compaction.
// 1. When if base compaction, we need to guarantee the end version
// is less than or equal to alter_version.
// 2. When if cu compaction, we need to guarantee the start version
// is large than alter_version.
bool check_compaction_input_verions(const TabletCompactionJobPB& compaction,
const TabletJobInfoPB& job_pb, std::stringstream& ss) {
if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) {
return true;
}
if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE ||
compaction.type() == TabletCompactionJobPB::STOP_TOKEN) {
return true;
}
if (compaction.input_versions_size() != 2 ||
compaction.input_versions(0) > compaction.input_versions(1)) {
SS << "The compaction need to know [start_version, end_version], and the start_version "
"should LE end_version. \n"
<< "compaction job=" << proto_to_json(compaction);
return false;
}
int64_t alter_version = job_pb.schema_change().alter_version();
bool legal = (compaction.type() == TabletCompactionJobPB::BASE &&
compaction.input_versions(1) <= alter_version) ||
(compaction.type() == TabletCompactionJobPB::CUMULATIVE &&
compaction.input_versions(0) > alter_version);
if (legal) {
return true;
}
SS << "Check compaction input versions failed in schema change. input_version_start="
<< compaction.input_versions(0) << " input_version_end=" << compaction.input_versions(1)
<< " schema_change_alter_version=" << job_pb.schema_change().alter_version();
return false;
}
void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn, const StartTabletJobRequest* request,
StartTabletJobResponse* response, std::string& instance_id,
bool& need_commit, bool is_versioned_read,
ResourceManager* resource_mgr) {
auto& compaction = request->job().compaction(0);
if (!compaction.has_id() || compaction.id().empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no job id specified";
return;
}
// check compaction_cnt to avoid compact on expired tablet cache
if (!compaction.has_base_compaction_cnt() || !compaction.has_cumulative_compaction_cnt()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid compaction_cnt given";
return;
}
if (compaction.expiration() <= 0 &&
compaction.type() != TabletCompactionJobPB::EMPTY_CUMULATIVE) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid expiration given";
return;
}
if (compaction.lease() <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid lease given";
return;
}
int64_t table_id = request->job().idx().table_id();
int64_t index_id = request->job().idx().index_id();
int64_t partition_id = request->job().idx().partition_id();
int64_t tablet_id = request->job().idx().tablet_id();
TabletStatsPB stats;
if (!is_versioned_read) {
std::string stats_key =
stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
std::string stats_val;
TxnErrorCode err = txn->get(stats_key, &stats_val);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "not found" : "get kv error")
<< " when get tablet stats, tablet_id=" << tablet_id << " key=" << hex(stats_key)
<< " err=" << err;
msg = ss.str();
return;
}
CHECK(stats.ParseFromString(stats_val));
} else {
CloneChainReader reader(instance_id, resource_mgr);
TxnErrorCode err = reader.get_tablet_compact_stats(txn.get(), tablet_id, &stats, nullptr);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "not found" : "get kv error")
<< " when get tablet compact stats, tablet_id=" << tablet_id << " err=" << err;
msg = ss.str();
return;
}
}
if (compaction.base_compaction_cnt() < stats.base_compaction_cnt() ||
compaction.cumulative_compaction_cnt() < stats.cumulative_compaction_cnt()) {
code = MetaServiceCode::STALE_TABLET_CACHE;
SS << "could not perform compaction on expired tablet cache."
<< " req_base_compaction_cnt=" << compaction.base_compaction_cnt()
<< ", base_compaction_cnt=" << stats.base_compaction_cnt()
<< ", req_cumulative_compaction_cnt=" << compaction.cumulative_compaction_cnt()
<< ", cumulative_compaction_cnt=" << stats.cumulative_compaction_cnt();
msg = ss.str();
return;
}
auto job_key = job_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
std::string job_val;
TabletJobInfoPB job_pb;
TxnErrorCode err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
SS << "failed to get tablet job, instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " key=" << hex(job_key) << " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
return;
}
while (err == TxnErrorCode::TXN_OK) {
job_pb.ParseFromString(job_val);
if (!check_compaction_input_verions(compaction, job_pb, ss)) {
msg = ss.str();
INSTANCE_LOG(INFO) << msg;
code = MetaServiceCode::JOB_CHECK_ALTER_VERSION;
response->set_alter_version(job_pb.schema_change().alter_version());
return;
}
if (job_pb.compaction().empty()) {
break;
}
auto& compactions = *job_pb.mutable_compaction();
// Remove expired compaction jobs
// clang-format off
int64_t now = time(nullptr);
compactions.erase(std::remove_if(compactions.begin(), compactions.end(), [&](auto& c) {
DCHECK(c.expiration() > 0 || c.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) << proto_to_json(c);
DCHECK(c.lease() > 0) << proto_to_json(c);
if (c.expiration() > 0 && c.expiration() < now) {
INSTANCE_LOG(INFO)
<< "got an expired job. job=" << proto_to_json(c) << " now=" << now;
return true;
}
if (c.lease() > 0 && c.lease() < now) {
INSTANCE_LOG(INFO)
<< "got a job exceeding lease. job=" << proto_to_json(c) << " now=" << now;
return true;
}
return false;
}), compactions.end());
// clang-format on
// Check conflict job
if (std::ranges::any_of(compactions, [](const auto& c) {
return c.type() == TabletCompactionJobPB::STOP_TOKEN;
})) {
auto it = std::ranges::find_if(compactions, [](const auto& c) {
return c.type() == TabletCompactionJobPB::STOP_TOKEN;
});
msg = fmt::format(
"compactions are not allowed on tablet_id={} currently, blocked by schema "
"change job delete_bitmap_initiator={}",
tablet_id, it->delete_bitmap_lock_initiator());
code = MetaServiceCode::JOB_TABLET_BUSY;
return;
}
if (compaction.type() == TabletCompactionJobPB::FULL) {
// Full compaction is generally used for data correctness repair
// for MOW table, so priority should be given to performing full
// compaction operations and canceling other types of compaction.
compactions.Clear();
} else if (compaction.type() == TabletCompactionJobPB::STOP_TOKEN) {
// fail all existing compactions
compactions.Clear();
} else if ((!compaction.has_check_input_versions_range() &&
compaction.input_versions().empty()) ||
(compaction.has_check_input_versions_range() &&
!compaction.check_input_versions_range())) {
// Unknown input version range, doesn't support parallel compaction of same type
for (auto& c : compactions) {
if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL)
continue;
if (c.id() == compaction.id()) return; // Same job, return OK to keep idempotency
msg = fmt::format("compaction has already started, tablet_id={} job={}", tablet_id,
proto_to_json(c));
code = MetaServiceCode::JOB_TABLET_BUSY;
return;
}
} else {
DCHECK_EQ(compaction.input_versions_size(), 2) << proto_to_json(compaction);
DCHECK_LE(compaction.input_versions(0), compaction.input_versions(1))
<< proto_to_json(compaction);
auto version_not_conflict = [](const TabletCompactionJobPB& a,
const TabletCompactionJobPB& b) {
return a.input_versions(0) > b.input_versions(1) ||
a.input_versions(1) < b.input_versions(0);
};
for (auto& c : compactions) {
if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL)
continue;
if (c.input_versions_size() > 0 && version_not_conflict(c, compaction)) continue;
if (c.id() == compaction.id()) return; // Same job, return OK to keep idempotency
msg = fmt::format("compaction has already started, tablet_id={} job={}", tablet_id,
proto_to_json(c));
code = MetaServiceCode::JOB_TABLET_BUSY;
// Unknown version range of started compaction, BE should not retry other version range
if (c.input_versions_size() == 0) return;
// Notify version ranges in started compaction to BE, so BE can retry other version range
for (auto& c : compactions) {
if (c.type() == compaction.type() || c.type() == TabletCompactionJobPB::FULL) {
// If there are multiple started compaction of same type, they all must has input version range
DCHECK_EQ(c.input_versions_size(), 2) << proto_to_json(c);
response->add_version_in_compaction(c.input_versions(0));
response->add_version_in_compaction(c.input_versions(1));
}
}
return;
}
}
break;
}
if (!job_pb.has_idx()) {
job_pb.mutable_idx()->CopyFrom(request->job().idx());
}
job_pb.add_compaction()->CopyFrom(compaction);
job_pb.SerializeToString(&job_val);
if (job_val.empty()) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "pb serialization error";
return;
}
INSTANCE_LOG(INFO) << "compaction job to save job=" << proto_to_json(compaction);
txn->put(job_key, job_val);
need_commit = true;
}
void start_schema_change_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const StartTabletJobRequest* request, StartTabletJobResponse* response,
std::string& instance_id, bool& need_commit, bool is_versioned_read,
ResourceManager* resource_mgr) {
auto& schema_change = request->job().schema_change();
if (!schema_change.has_id() || schema_change.id().empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no job id specified";
return;
}
if (!schema_change.has_initiator()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no initiator specified";
return;
}
// check new_tablet state
int64_t new_tablet_id = schema_change.new_tablet_idx().tablet_id();
if (new_tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid new_tablet_id given";
return;
}
int64_t table_id = request->job().idx().table_id();
int64_t index_id = request->job().idx().index_id();
int64_t partition_id = request->job().idx().partition_id();
int64_t tablet_id = request->job().idx().tablet_id();
if (new_tablet_id == tablet_id) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "not allow new_tablet_id same with base_tablet_id";
return;
}
CloneChainReader reader(instance_id, resource_mgr);
auto& new_tablet_idx = const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
!new_tablet_idx.has_partition_id()) {
if (!is_versioned_read) {
get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, new_tablet_idx);
if (code != MetaServiceCode::OK) return;
} else {
TxnErrorCode err = reader.get_tablet_index(txn.get(), tablet_id, &new_tablet_idx);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
SS << "failed to get new tablet index, instance_id=" << instance_id
<< " tablet_id=" << new_tablet_id << " err=" << err;
msg = ss.str();
return;
}
}
}
doris::TabletMetaCloudPB new_tablet_meta;
if (!is_versioned_read) {
std::string new_tablet_key =
meta_tablet_key({instance_id, new_tablet_idx.table_id(), new_tablet_idx.index_id(),
new_tablet_idx.partition_id(), new_tablet_id});
std::string new_tablet_val;
TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get new tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " key=" << hex(new_tablet_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
}
if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return;
}
} else {
TxnErrorCode err =
reader.get_tablet_meta(txn.get(), new_tablet_id, &new_tablet_meta, nullptr);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
SS << "failed to get new versioned tablet meta, instance_id=" << instance_id
<< " tablet_id=" << new_tablet_id << " err=" << err;
msg = ss.str();
return;
}
}
if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
code = MetaServiceCode::JOB_ALREADY_SUCCESS;
msg = "schema_change job already success";
return;
}
if (!new_tablet_meta.has_tablet_state() ||
new_tablet_meta.tablet_state() != doris::TabletStatePB::PB_NOTREADY) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid new tablet state";
return;
}
auto job_key = job_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
std::string job_val;
TabletJobInfoPB job_pb;
TxnErrorCode err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
SS << "failed to get tablet job, instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " key=" << hex(job_key) << " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
return;
}
if (!job_pb.ParseFromString(job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "pb deserialization failed";
return;
}
if (job_pb.has_schema_change() && job_pb.schema_change().has_alter_version() &&
job_pb.schema_change().id() == schema_change.id() &&
job_pb.schema_change().initiator() == schema_change.initiator()) {
TEST_SYNC_POINT_CALLBACK("restart_compaction_job");
response->set_alter_version(job_pb.schema_change().alter_version());
return;
}
job_pb.mutable_idx()->CopyFrom(request->job().idx());
// FE can ensure that a tablet does not have more than one schema_change job at the same time,
// so we can directly preempt previous schema_change job.
job_pb.mutable_schema_change()->CopyFrom(schema_change);
job_pb.SerializeToString(&job_val);
if (job_val.empty()) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "pb serialization error";
return;
}
TabletJobInfoPB new_tablet_job_pb {job_pb};
std::string new_tablet_job_val;
new_tablet_job_pb.clear_compaction();
new_tablet_job_pb.SerializeToString(&new_tablet_job_val);
if (new_tablet_job_val.empty()) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "pb serialization error";
return;
}
INSTANCE_LOG(INFO) << "schema_change job to save job=" << proto_to_json(schema_change);
txn->put(job_key, job_val);
auto new_tablet_job_key =
job_tablet_key({instance_id, new_tablet_idx.table_id(), new_tablet_idx.index_id(),
new_tablet_idx.partition_id(), new_tablet_id});
txn->put(new_tablet_job_key, new_tablet_job_val);
response->set_alter_version(job_pb.schema_change().alter_version());
need_commit = true;
}
void MetaServiceImpl::start_tablet_job(::google::protobuf::RpcController* controller,
const StartTabletJobRequest* request,
StartTabletJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(start_tablet_job, get, put);
std::string cloud_unique_id = request->cloud_unique_id();
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
SS << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
return;
}
RPC_RATE_LIMIT(start_tablet_job)
if (!request->has_job() ||
(request->job().compaction().empty() && !request->job().has_schema_change())) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid job specified";
return;
}
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
int64_t tablet_id = request->job().idx().tablet_id();
if (tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid tablet_id given";
return;
}
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
bool is_versioned_read = is_version_read_enabled(instance_id);
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
if (!is_versioned_read) {
get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) return;
} else {
CloneChainReader reader(instance_id, resource_mgr_.get());
err = reader.get_tablet_index(txn.get(), tablet_id, &tablet_idx);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet index, tablet_id={}, err={}", tablet_id,
err);
LOG(WARNING) << msg;
return;
}
}
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
tablet_idx.partition_id())) {
code = MetaServiceCode::TABLET_NOT_FOUND;
msg = fmt::format("tablet {} has been dropped", tablet_id);
return;
}
bool need_commit = false;
DORIS_CLOUD_DEFER {
if (!need_commit) return;
TxnErrorCode err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit job kv, err=" << err;
msg = ss.str();
return;
}
};
if (!request->job().compaction().empty()) {
start_compaction_job(code, msg, ss, txn, request, response, instance_id, need_commit,
is_versioned_read, resource_mgr_.get());
return;
}
if (request->job().has_schema_change()) {
start_schema_change_job(code, msg, ss, txn, request, response, instance_id, need_commit,
is_versioned_read, resource_mgr_.get());
return;
}
}
static bool check_and_remove_delete_bitmap_update_lock(
MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn, std::string& instance_id, int64_t table_id,
int64_t tablet_id, int64_t lock_id, int64_t lock_initiator, std::string use_version) {
VLOG_DEBUG << "check_and_remove_delete_bitmap_update_lock table_id=" << table_id
<< " tablet_id=" << tablet_id << " lock_id=" << lock_id
<< " initiator=" << lock_initiator << " use_version=" << use_version;
std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
std::string lock_val;
TxnErrorCode err = txn->get(lock_key, &lock_val);
LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id
<< " tablet_id=" << tablet_id << " lock_id=" << lock_id
<< " initiator=" << lock_initiator << " key=" << hex(lock_key) << " err=" << err;
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get delete bitmap update lock key, instance_id=" << instance_id
<< " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
return false;
}
DeleteBitmapUpdateLockPB lock_info;
if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse DeleteBitmapUpdateLockPB";
return false;
}
if (lock_info.lock_id() != lock_id) {
ss << "lock id not match, locked by lock_id=" << lock_info.lock_id();
msg = ss.str();
code = MetaServiceCode::LOCK_EXPIRED;
return false;
}
if (use_version == "v2" && is_job_delete_bitmap_lock_id(lock_id)) {
// when upgrade ms, prevent old ms get delete bitmap update lock
if (lock_info.initiators_size() > 0) {
ss << "compaction lock has " << lock_info.initiators_size() << " initiators";
msg = ss.str();
code = MetaServiceCode::LOCK_EXPIRED;
return false;
}
std::string tablet_job_key = mow_tablet_job_key({instance_id, table_id, lock_initiator});
std::string tablet_job_val;
err = txn->get(tablet_job_key, &tablet_job_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "lock initiator " << lock_initiator << " not exist";
msg = ss.str();
code = MetaServiceCode::LOCK_EXPIRED;
return false;
} else if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get tablet job key, instance_id=" << instance_id
<< " table_id=" << table_id << " tablet_id=" << tablet_id
<< " initiator=" << lock_initiator << " key=" << hex(tablet_job_key)
<< " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
return false;
}
txn->remove(tablet_job_key);
INSTANCE_LOG(INFO) << "remove tablet job lock, table_id=" << table_id
<< " tablet_id=" << tablet_id << " lock_id=" << lock_id
<< " initiator=" << lock_initiator << " key=" << hex(tablet_job_key);
// may left a lock key for -1
return true;
} else {
// TODO does not check expired time
bool found = false;
auto initiators = lock_info.mutable_initiators();
for (auto iter = initiators->begin(); iter != initiators->end(); iter++) {
if (*iter == lock_initiator) {
initiators->erase(iter);
found = true;
break;
}
}
if (!found) {
ss << "lock initiator " << lock_initiator << " not exist";
msg = ss.str();
code = MetaServiceCode::LOCK_EXPIRED;
return false;
}
if (initiators->empty()) {
INSTANCE_LOG(INFO) << "remove delete bitmap lock, table_id=" << table_id
<< " tablet_id=" << tablet_id << " lock_id=" << lock_id
<< " key=" << hex(lock_key);
txn->remove(lock_key);
return true;
}
lock_info.SerializeToString(&lock_val);
if (lock_val.empty()) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "pb serialization error";
return false;
}
INSTANCE_LOG(INFO) << "remove delete bitmap lock initiator, table_id=" << table_id
<< " tablet_id=" << tablet_id << ", key=" << hex(lock_key)
<< " lock_id=" << lock_id << " initiator=" << lock_initiator
<< " initiators_size=" << lock_info.initiators_size();
txn->put(lock_key, lock_val);
}
return true;
}
static void remove_delete_bitmap_update_lock_v1(std::unique_ptr<Transaction>& txn,
const std::string& instance_id, int64_t table_id,
int64_t tablet_id, int64_t lock_id,
int64_t lock_initiator) {
std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
std::string lock_val;
TxnErrorCode err = txn->get(lock_key, &lock_val);
LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << table_id
<< " key=" << hex(lock_key) << " err=" << err << " initiator=" << lock_initiator;
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to get delete bitmap update lock key, instance_id=" << instance_id
<< " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err;
return;
}
DeleteBitmapUpdateLockPB lock_info;
if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
LOG(WARNING) << "failed to parse DeleteBitmapUpdateLockPB, instance_id=" << instance_id
<< " table_id=" << table_id << " key=" << hex(lock_key);
return;
}
if (lock_info.lock_id() != lock_id) {
return;
}
bool found = false;
auto initiators = lock_info.mutable_initiators();
for (auto iter = initiators->begin(); iter != initiators->end(); iter++) {
if (*iter == lock_initiator) {
initiators->erase(iter);
found = true;
break;
}
}
if (!found) {
INSTANCE_LOG(WARNING) << "failed to find lock_initiator, table_id=" << table_id
<< " tablet_id=" << tablet_id << " lock_id=" << lock_id
<< " initiator=" << lock_initiator << " key=" << hex(lock_key);
return;
}
if (initiators->empty()) {
INSTANCE_LOG(INFO) << "remove delete bitmap lock, table_id=" << table_id
<< " tablet_id=" << tablet_id << " lock_id=" << lock_id
<< " initiator=" << lock_initiator << " key=" << hex(lock_key);
txn->remove(lock_key);
return;
}
lock_info.SerializeToString(&lock_val);
if (lock_val.empty()) {
INSTANCE_LOG(WARNING) << "failed to seiralize lock_info, table_id=" << table_id
<< " key=" << hex(lock_key);
return;
}
INSTANCE_LOG(INFO) << "remove delete bitmap lock initiator, table_id=" << table_id
<< " tablet_id=" << tablet_id << " key=" << hex(lock_key)
<< " lock_id=" << lock_id << " initiator=" << lock_initiator
<< " initiators_size=" << lock_info.initiators_size();
txn->put(lock_key, lock_val);
}
static void remove_delete_bitmap_update_lock(std::unique_ptr<Transaction>& txn,
const std::string& instance_id, int64_t table_id,
int64_t tablet_id, int64_t lock_id,
int64_t lock_initiator, std::string use_version) {
VLOG_DEBUG << "remove_delete_bitmap_update_lock table_id=" << table_id
<< " initiator=" << lock_initiator << " tablet_id=" << tablet_id
<< " lock_id=" << lock_id << " use_version=" << use_version;
if (use_version == "v2" && is_job_delete_bitmap_lock_id(lock_id)) {
std::string tablet_job_key = mow_tablet_job_key({instance_id, table_id, lock_initiator});
std::string tablet_job_val;
TxnErrorCode err = txn->get(tablet_job_key, &tablet_job_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
remove_delete_bitmap_update_lock_v1(txn, instance_id, table_id, tablet_id, lock_id,
lock_initiator);
} else if (err != TxnErrorCode::TXN_OK) {
INSTANCE_LOG(WARNING) << "failed to get tablet job key, instance_id=" << instance_id
<< " table_id=" << table_id << " initiator=" << lock_initiator
<< " key=" << hex(tablet_job_key) << " err=" << err;
return;
} else {
txn->remove(tablet_job_key);
INSTANCE_LOG(INFO) << "remove tablet job key, table_id=" << table_id
<< ", key=" << hex(tablet_job_key)
<< " initiator=" << lock_initiator;
}
} else {
remove_delete_bitmap_update_lock_v1(txn, instance_id, table_id, tablet_id, lock_id,
lock_initiator);
}
}
int compaction_update_tablet_stats(const TabletCompactionJobPB& compaction, TabletStatsPB* stats,
MetaServiceCode& code, std::string& msg, int64_t now) {
if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
stats->set_cumulative_point(compaction.output_cumulative_point());
stats->set_last_cumu_compaction_time_ms(now * 1000);
} else if (compaction.type() == TabletCompactionJobPB::CUMULATIVE) {
// clang-format off
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
if (compaction.output_cumulative_point() > stats->cumulative_point()) {
// After supporting parallel cumu compaction, compaction with older cumu point may be committed after
// new cumu point has been set, MUST NOT set cumu point back to old value
stats->set_cumulative_point(compaction.output_cumulative_point());
}
stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_cumu_compaction_time_ms(now * 1000);
// clang-format on
} else if (compaction.type() == TabletCompactionJobPB::BASE) {
// clang-format off
stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_base_compaction_time_ms(now * 1000);
// clang-format on
} else if (compaction.type() == TabletCompactionJobPB::FULL) {
// clang-format off
stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ? stats->full_compaction_cnt() + 1 : 1);
if (compaction.output_cumulative_point() > stats->cumulative_point()) {
// After supporting parallel cumu compaction, compaction with older cumu point may be committed after
// new cumu point has been set, MUST NOT set cumu point back to old value
stats->set_cumulative_point(compaction.output_cumulative_point());
}
stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() - compaction.num_input_rows()));
stats->set_data_size(stats->data_size() + (compaction.size_output_rowsets() - compaction.size_input_rowsets()));
stats->set_num_rowsets(stats->num_rowsets() + (compaction.num_output_rowsets() - compaction.num_input_rowsets()));
stats->set_num_segments(stats->num_segments() + (compaction.num_output_segments() - compaction.num_input_segments()));
stats->set_index_size(stats->index_size() + (compaction.index_size_output_rowsets() - compaction.index_size_input_rowsets()));
stats->set_segment_size(stats->segment_size() + (compaction.segment_size_output_rowsets() - compaction.segment_size_input_rowsets()));
stats->set_last_full_compaction_time_ms(now * 1000);
// clang-format on
} else {
msg = "invalid compaction type";
code = MetaServiceCode::INVALID_ARGUMENT;
return -1;
}
return 0;
}
std::pair<MetaServiceCode, std::string> scan_compaction_input_rowsets(
Transaction* txn, std::string_view instance_id, int64_t tablet_id, std::string& rs_start,
std::string& rs_end, int& num_rowsets, auto&& callback) {
std::unique_ptr<RangeGetIterator> it;
DORIS_CLOUD_DEFER {
INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets << " range=["
<< hex(rs_start) << "," << hex(rs_end) << "]";
};
auto rs_start1 = rs_start;
do {
TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
if (err != TxnErrorCode::TXN_OK) {
return {cast_as<ErrCategory::READ>(err),
fmt::format("internal error, failed to get rowset range, err={} tablet_id={} "
"range=[{}, {})",
err, tablet_id, hex(rs_start), hex(rs_end))};
}
while (it->has_next()) {
auto [k, v] = it->next();
doris::RowsetMetaCloudPB rs;
if (!rs.ParseFromArray(v.data(), v.size())) {
return {MetaServiceCode::PROTOBUF_PARSE_ERR,
fmt::format(
"malformed rowset meta, unable to deserialize, tablet_id={} key={}",
tablet_id, hex(k))};
}
callback(std::move(rs));
++num_rowsets;
if (!it->has_next()) rs_start1 = k;
}
rs_start1.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
return {MetaServiceCode::OK, ""};
}
void process_compaction_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response, TabletJobInfoPB& recorded_job,
std::string& instance_id, std::string& job_key, bool& need_commit,
std::string& use_version, bool is_versioned_read,
bool is_versioned_write, TxnKv* txn_kv, ResourceManager* resource_mgr) {
//==========================================================================
// check
//==========================================================================
int64_t table_id = request->job().idx().table_id();
int64_t index_id = request->job().idx().index_id();
int64_t partition_id = request->job().idx().partition_id();
int64_t tablet_id = request->job().idx().tablet_id();
CompactionLogPB compaction_log;
compaction_log.set_tablet_id(tablet_id);
if (recorded_job.compaction().empty()) {
SS << "there is no running compaction, tablet_id=" << tablet_id;
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto& compaction = request->job().compaction(0);
auto recorded_compaction = recorded_job.mutable_compaction()->begin();
for (; recorded_compaction != recorded_job.mutable_compaction()->end(); ++recorded_compaction) {
if (recorded_compaction->id() == compaction.id()) break;
}
if (recorded_compaction == recorded_job.mutable_compaction()->end()) {
SS << "unmatched job id, recorded_job=" << proto_to_json(recorded_job)
<< " given_job=" << proto_to_json(compaction);
code = MetaServiceCode::INVALID_ARGUMENT;
msg = ss.str();
return;
}
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
if (recorded_compaction->expiration() > 0 && recorded_compaction->expiration() < now) {
code = MetaServiceCode::JOB_EXPIRED;
SS << "expired compaction job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(*recorded_compaction);
msg = ss.str();
// FIXME: Just remove or notify to abort?
// LOG(INFO) << "remove expired job, tablet_id=" << tablet_id << " key=" << hex(job_key);
return;
}
if (request->action() != FinishTabletJobRequest::COMMIT &&
request->action() != FinishTabletJobRequest::ABORT &&
request->action() != FinishTabletJobRequest::LEASE) {
SS << "unsupported action, tablet_id=" << tablet_id << " action=" << request->action();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
bool abort_compaction = false;
if (request->action() == FinishTabletJobRequest::COMMIT &&
!check_compaction_input_verions(compaction, recorded_job, ss)) {
msg = ss.str();
INSTANCE_LOG(INFO) << msg;
abort_compaction = true;
response->set_alter_version(recorded_job.schema_change().alter_version());
code = MetaServiceCode::JOB_CHECK_ALTER_VERSION;
}
//==========================================================================
// Abort
//==========================================================================
if (request->action() == FinishTabletJobRequest::ABORT || abort_compaction) {
// TODO(gavin): mv tmp rowsets to recycle or remove them directly
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "abort tablet compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
if (compaction.has_delete_bitmap_lock_initiator()) {
remove_delete_bitmap_update_lock(
txn, instance_id, table_id, tablet_id, COMPACTION_DELETE_BITMAP_LOCK_ID,
compaction.delete_bitmap_lock_initiator(), use_version);
}
need_commit = true;
return;
}
//==========================================================================
// Lease
//==========================================================================
if (request->action() == FinishTabletJobRequest::LEASE) {
if (compaction.lease() <= 0 || recorded_compaction->lease() > compaction.lease()) {
ss << "invalid lease. recoreded_lease=" << recorded_compaction->lease()
<< " req_lease=" << compaction.lease();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
recorded_compaction->set_lease(compaction.lease());
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "lease tablet compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
return;
}
//==========================================================================
// Commit
//==========================================================================
//
// 1. update tablet stats
// 2. move compaction input rowsets to recycle
// 3. change tmp rowset to formal rowset
// 4. remove compaction job
//
//==========================================================================
// Update tablet stats
//==========================================================================
auto stats = response->mutable_stats();
CloneChainReader meta_reader(instance_id, resource_mgr);
TabletStats detached_stats;
if (is_versioned_read) {
// The compact stats = tablet stats, the load stats = detached stats
TxnErrorCode err = meta_reader.get_tablet_compact_stats(
txn.get(), tablet_id, stats, nullptr, config::snapshot_get_tablet_stats);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet compact stats, tablet_id={}, err={}", tablet_id,
err);
LOG(WARNING) << msg;
return;
}
TabletStatsPB load_stats;
err = meta_reader.get_tablet_load_stats(txn.get(), tablet_id, &load_stats, nullptr,
config::snapshot_get_tablet_stats);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet load stats, tablet_id={}, err={}", tablet_id,
err);
LOG(WARNING) << msg;
return;
}
detach_tablet_stats(load_stats, detached_stats);
} else {
// ATTN: The condition that snapshot read can be used to get tablet stats is: all other transactions that put tablet stats
// can make read write conflicts with this transaction on other keys. Currently, if all meta-service nodes are running
// with `config::split_tablet_stats = true` can meet the condition.
internal_get_tablet_stats(code, msg, txn.get(), instance_id, request->job().idx(), *stats,
detached_stats, config::snapshot_get_tablet_stats);
if (code != MetaServiceCode::OK) {
LOG_WARNING("failed to get tablet stats")
.tag("instance_id", instance_id)
.tag("tablet_id", tablet_id)
.tag("code", code)
.tag("msg", msg);
return;
}
}
if (compaction_update_tablet_stats(compaction, stats, code, msg, now) == -1) {
return;
}
auto stats_key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
auto stats_val = stats->SerializeAsString();
VLOG_DEBUG << "data size, tablet_id=" << tablet_id << " stats.num_rows=" << stats->num_rows()
<< " stats.data_size=" << stats->data_size()
<< " stats.num_rowsets=" << stats->num_rowsets()
<< " stats.num_segments=" << stats->num_segments()
<< " stats.index_size=" << stats->index_size()
<< " stats.segment_size=" << stats->segment_size()
<< " detached_stats.num_rows=" << detached_stats.num_rows
<< " detached_stats.data_size=" << detached_stats.data_size
<< " detached_stats.num_rowset=" << detached_stats.num_rowsets
<< " detached_stats.num_segments=" << detached_stats.num_segs
<< " detached_stats.index_size=" << detached_stats.index_size
<< " detached_stats.segment_size=" << detached_stats.segment_size
<< " compaction.size_output_rowsets=" << compaction.size_output_rowsets()
<< " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
txn->put(stats_key, stats_val);
if (is_versioned_write) {
std::string compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_id});
LOG_INFO("put versioned tablet compact stats key")
.tag("compact_stats_key", hex(compact_stats_key))
.tag("tablet_id", tablet_id)
.tag("value_size", stats_val.size())
.tag("instance_id", instance_id);
versioned_put(txn.get(), compact_stats_key, stats_val);
}
merge_tablet_stats(*stats, detached_stats); // this is to check
if (stats->data_size() < 0 || stats->num_rowsets() < 1) [[unlikely]] {
INSTANCE_LOG(ERROR) << "buggy data size, tablet_id=" << tablet_id
<< " stats.num_rows=" << stats->num_rows()
<< " stats.data_size=" << stats->data_size()
<< " stats.num_rowsets=" << stats->num_rowsets()
<< " stats.num_segments=" << stats->num_segments()
<< " stats.index_size=" << stats->index_size()
<< " stats.segment_size=" << stats->segment_size()
<< " detached_stats.num_rows=" << detached_stats.num_rows
<< " detached_stats.data_size=" << detached_stats.data_size
<< " detached_stats.num_rowset=" << detached_stats.num_rowsets
<< " detached_stats.num_segments=" << detached_stats.num_segs
<< " detached_stats.index_size=" << detached_stats.index_size
<< " detached_stats.segment_size=" << detached_stats.segment_size
<< " compaction.size_output_rowsets="
<< compaction.size_output_rowsets()
<< " compaction.size_input_rowsets=" << compaction.size_input_rowsets();
DCHECK(false) << "buggy data size, tablet_id=" << tablet_id;
}
VLOG_DEBUG << "update tablet stats tablet_id=" << tablet_id << " key=" << hex(stats_key)
<< " stats=" << proto_to_json(*stats);
if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
return;
}
// remove delete bitmap update lock for MoW table
if (compaction.has_delete_bitmap_lock_initiator()) {
bool success = check_and_remove_delete_bitmap_update_lock(
code, msg, ss, txn, instance_id, table_id, tablet_id,
COMPACTION_DELETE_BITMAP_LOCK_ID, compaction.delete_bitmap_lock_initiator(),
use_version);
if (!success) {
return;
}
}
//==========================================================================
// Move input rowsets to recycle
//==========================================================================
if (compaction.input_versions_size() != 2 || compaction.output_versions_size() != 1 ||
compaction.output_rowset_ids_size() != 1) {
code = MetaServiceCode::INVALID_ARGUMENT;
SS << "invalid input or output versions, input_versions_size="
<< compaction.input_versions_size()
<< " output_versions_size=" << compaction.output_versions_size()
<< " output_rowset_ids_size=" << compaction.output_rowset_ids_size();
msg = ss.str();
return;
}
auto start = compaction.input_versions(0);
auto end = compaction.input_versions(1);
auto rs_start = meta_rowset_key({instance_id, tablet_id, start});
auto rs_end = meta_rowset_key({instance_id, tablet_id, end + 1});
compaction_log.set_start_version(start);
compaction_log.set_end_version(end);
int num_rowsets = 0;
auto handle_compaction_input_rowset_meta = [&](doris::RowsetMetaCloudPB rs) {
// remove delete bitmap of input rowset for MoW table
if (compaction.has_delete_bitmap_lock_initiator()) {
auto delete_bitmap_start =
meta_delete_bitmap_key({instance_id, tablet_id, rs.rowset_id_v2(), 0, 0});
auto delete_bitmap_end = meta_delete_bitmap_key(
{instance_id, tablet_id, rs.rowset_id_v2(), INT64_MAX, INT64_MAX});
txn->remove(delete_bitmap_start, delete_bitmap_end);
}
auto recycle_key = recycle_rowset_key({instance_id, tablet_id, rs.rowset_id_v2()});
RecycleRowsetPB recycle_rowset;
recycle_rowset.set_creation_time(now);
recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
if (config::enable_recycle_rowset_strip_key_bounds) {
// Strip key bounds to shrink operation log for ts compaction recycle entries
recycle_rowset.mutable_rowset_meta()->clear_segments_key_bounds();
recycle_rowset.mutable_rowset_meta()->clear_segments_key_bounds_truncated();
}
recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
if (is_versioned_write) {
compaction_log.add_recycle_rowsets()->Swap(&recycle_rowset);
} else {
auto recycle_val = recycle_rowset.SerializeAsString();
txn->put(recycle_key, recycle_val);
INSTANCE_LOG(INFO) << "put recycle rowset, tablet_id=" << tablet_id
<< " key=" << hex(recycle_key);
}
};
if (!is_versioned_read) {
std::tie(code, msg) =
scan_compaction_input_rowsets(txn.get(), instance_id, tablet_id, rs_start, rs_end,
num_rowsets, handle_compaction_input_rowset_meta);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << msg;
return;
}
} else {
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err =
meta_reader.get_rowset_metas(txn.get(), tablet_id, start, end, &rowset_metas);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get rowset metas, tablet_id={}, start={}, end={}, err={}",
tablet_id, start, end, err);
LOG(WARNING) << msg;
return;
}
num_rowsets = rowset_metas.size();
for (auto&& rowset_meta : rowset_metas) {
handle_compaction_input_rowset_meta(std::move(rowset_meta));
}
}
txn->remove(rs_start, rs_end);
LOG_INFO("cloud process compaction job txn remove meta rowset key")
.tag("instance_id", instance_id)
.tag("tablet_id", tablet_id)
.tag("start_version", start)
.tag("end_version", end + 1)
.tag("rs_start key", hex(rs_start))
.tag("rs_end key", hex(rs_end));
TEST_SYNC_POINT_CALLBACK("process_compaction_job::loop_input_done", &num_rowsets);
// compaction.num_input_rowsets is 0 when multiple hole rowsets are compacted,
// we can continue to process the job for this case.
if (num_rowsets < 1 && compaction.num_input_rowsets() > 0) {
SS << "too few input rowsets, tablet_id=" << tablet_id << " num_rowsets=" << num_rowsets;
code = MetaServiceCode::UNDEFINED_ERR;
msg = ss.str();
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job, tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
TEST_SYNC_POINT_CALLBACK("process_compaction_job::too_few_rowsets", &need_commit);
return;
}
//==========================================================================
// Change tmp rowset to formal rowset
//==========================================================================
if (compaction.txn_id_size() != 1) {
SS << "invalid txn_id, txn_id_size=" << compaction.txn_id_size();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
int64_t txn_id = compaction.txn_id(0);
auto& rowset_id = compaction.output_rowset_ids(0);
if (txn_id <= 0 || rowset_id.empty()) {
SS << "invalid txn_id or rowset_id, tablet_id=" << tablet_id << " txn_id=" << txn_id
<< " rowset_id=" << rowset_id;
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto tmp_rowset_key = meta_rowset_tmp_key({instance_id, txn_id, tablet_id});
std::string tmp_rowset_val;
TxnErrorCode err = txn->get(tmp_rowset_key, &tmp_rowset_val);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get tmp rowset key"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< ", tablet_id=" << tablet_id << " tmp_rowset_key=" << hex(tmp_rowset_key)
<< ", err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::UNDEFINED_ERR
: cast_as<ErrCategory::READ>(err);
return;
}
doris::RowsetMetaCloudPB rs_meta;
rs_meta.ParseFromString(tmp_rowset_val);
if (rs_meta.txn_id() <= 0) {
SS << "invalid txn_id in output tmp rowset meta, tablet_id=" << tablet_id
<< " txn_id=" << rs_meta.txn_id();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
if (rs_meta.has_is_recycled() && rs_meta.is_recycled()) {
SS << "rowset has already been marked as recycled, tablet_id=" << tablet_id
<< " txn_id=" << rs_meta.txn_id() << " rowset_id=" << rs_meta.rowset_id_v2();
msg = ss.str();
code = MetaServiceCode::TXN_ALREADY_ABORTED;
return;
}
txn->remove(tmp_rowset_key);
INSTANCE_LOG(INFO) << "remove tmp rowset meta, tablet_id=" << tablet_id
<< " tmp_rowset_key=" << hex(tmp_rowset_key);
using namespace std::chrono;
auto rowset_visible_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
rs_meta.set_visible_ts_ms(rowset_visible_time);
std::string rowset_val;
if (!rs_meta.SerializeToString(&rowset_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
SS << "failed to serialize rowset meta, tablet_id=" << tablet_id
<< " rowset_id=" << rowset_id;
msg = ss.str();
return;
}
int64_t version = compaction.output_versions(0);
auto rowset_key = meta_rowset_key({instance_id, tablet_id, version});
txn->put(rowset_key, rowset_val);
if (is_versioned_write) {
std::string meta_rowset_compact_key =
versioned::meta_rowset_compact_key({instance_id, tablet_id, version});
// Put versioned rowset compact metadata for output rowset
LOG_INFO("put versioned meta rowset compact key")
.tag("instance_id", instance_id)
.tag("meta_rowset_compact_key", hex(meta_rowset_compact_key))
.tag("tablet_id", tablet_id)
.tag("version", version);
if (!versioned::document_put(txn.get(), meta_rowset_compact_key, std::move(rs_meta))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = fmt::format("failed to serialize versioned rowset meta, key={}",
hex(meta_rowset_compact_key));
return;
}
LOG(INFO) << "put meta_rowset_compact_key, tablet_id=" << tablet_id
<< " end_version=" << version << " key=" << hex(meta_rowset_compact_key);
}
INSTANCE_LOG(INFO) << "put rowset meta, tablet_id=" << tablet_id
<< " rowset_key=" << hex(rowset_key);
//==========================================================================
// Remove compaction job
//==========================================================================
// TODO(gavin): move deleted job info into recycle or history
recorded_job.mutable_compaction()->erase(recorded_compaction);
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id
<< " key=" << hex(job_key);
response->set_alter_version(recorded_job.has_schema_change() &&
recorded_job.schema_change().has_alter_version()
? recorded_job.schema_change().alter_version()
: -1);
need_commit = true;
if (!compaction_log.recycle_rowsets().empty() && is_versioned_write) {
size_t num_recycled_rowsets = compaction_log.recycle_rowsets().size();
std::string operation_log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(meta_reader.min_read_version());
}
operation_log.mutable_compaction()->Swap(&compaction_log);
versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG_INFO("put compaction operation log key")
.tag("instance_id", instance_id)
.tag("operation_log_key", hex(operation_log_key))
.tag("tablet_id", tablet_id)
.tag("recycle_rowsets_count", num_recycled_rowsets);
}
}
void schema_change_update_tablet_stats(const TabletSchemaChangeJobPB& schema_change,
TabletStatsPB* stats, int64_t num_remove_rows,
int64_t size_remove_rowsets, int64_t num_remove_rowsets,
int64_t num_remove_segments,
int64_t index_size_remove_rowsets,
int64_t segment_size_remove_rowsets) {
// ATTN: cumu point in job is from base tablet which may be fetched long time ago
// since the new tablet may have done cumu compactions with alter_version as initial cumu point
// current cumu point of new tablet may be larger than job.alter_version
// we need to keep the larger one in case of cumu point roll-back to
// break the basic assumptions of non-decreasing cumu point
stats->set_cumulative_point(
std::max(schema_change.output_cumulative_point(), stats->cumulative_point()));
stats->set_num_rows(stats->num_rows() + (schema_change.num_output_rows() - num_remove_rows));
stats->set_data_size(stats->data_size() +
(schema_change.size_output_rowsets() - size_remove_rowsets));
stats->set_num_rowsets(stats->num_rowsets() +
(schema_change.num_output_rowsets() - num_remove_rowsets));
stats->set_num_segments(stats->num_segments() +
(schema_change.num_output_segments() - num_remove_segments));
stats->set_index_size(stats->index_size() +
(schema_change.index_size_output_rowsets() - index_size_remove_rowsets));
stats->set_segment_size(stats->segment_size() + (schema_change.segment_size_output_rowsets() -
segment_size_remove_rowsets));
}
std::pair<MetaServiceCode, std::string> scan_schema_change_input_rowsets(
Transaction* txn, std::string_view instance_id, int64_t new_tablet_id,
std::string& rs_start, std::string& rs_end, auto&& callback) {
std::unique_ptr<RangeGetIterator> it;
auto rs_start1 = rs_start;
do {
TxnErrorCode err = txn->get(rs_start1, rs_end, &it);
if (err != TxnErrorCode::TXN_OK) {
return {MetaServiceCode::KV_TXN_GET_ERR,
fmt::format(
"internal error, failed to get rowset range, err={} new_tablet_id={} "
"range=[{}, {})",
err, new_tablet_id, hex(rs_start), hex(rs_end))};
}
while (it->has_next()) {
auto [k, v] = it->next();
doris::RowsetMetaCloudPB rs;
if (!rs.ParseFromArray(v.data(), v.size())) {
return {MetaServiceCode::PROTOBUF_PARSE_ERR,
fmt::format("malformed rowset meta, unable to deserialize, "
"new_tablet_id={} key={}",
new_tablet_id, hex(k))};
}
callback(std::move(rs));
if (!it->has_next()) rs_start1 = k;
}
rs_start1.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());
return {MetaServiceCode::OK, ""};
}
void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
std::unique_ptr<Transaction>& txn,
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response, TabletJobInfoPB& recorded_job,
std::string& instance_id, std::string& job_key, bool& need_commit,
std::string& use_version, bool is_versioned_read,
bool is_versioned_write, TxnKv* txn_kv,
ResourceManager* resource_mgr) {
//==========================================================================
// check
//==========================================================================
int64_t tablet_id = request->job().idx().tablet_id();
auto& schema_change = request->job().schema_change();
int64_t new_tablet_id = schema_change.new_tablet_idx().tablet_id();
SchemaChangeLogPB schema_change_log;
schema_change_log.set_old_tablet_id(tablet_id);
schema_change_log.set_new_tablet_id(new_tablet_id);
if (new_tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid new_tablet_id given";
return;
}
if (new_tablet_id == tablet_id) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "not allow new_tablet_id same with base_tablet_id";
return;
}
CloneChainReader reader(instance_id, resource_mgr);
auto& new_tablet_idx = const_cast<TabletIndexPB&>(schema_change.new_tablet_idx());
if (!new_tablet_idx.has_table_id() || !new_tablet_idx.has_index_id() ||
!new_tablet_idx.has_partition_id()) {
if (!is_versioned_read) {
get_tablet_idx(code, msg, txn.get(), instance_id, new_tablet_id, new_tablet_idx);
if (code != MetaServiceCode::OK) return;
} else {
TxnErrorCode err = reader.get_tablet_index(txn.get(), tablet_id, &new_tablet_idx);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get new tablet index, tablet_id={}, err={}", tablet_id,
err);
return;
}
}
}
int64_t new_table_id = new_tablet_idx.table_id();
int64_t new_index_id = new_tablet_idx.index_id();
int64_t new_partition_id = new_tablet_idx.partition_id();
doris::TabletMetaCloudPB new_tablet_meta;
auto new_tablet_key = meta_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
std::string new_tablet_val;
if (!is_versioned_read) {
TxnErrorCode err = txn->get(new_tablet_key, &new_tablet_val);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get new tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " key=" << hex(new_tablet_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
}
if (!new_tablet_meta.ParseFromString(new_tablet_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return;
}
} else {
TxnErrorCode err =
reader.get_tablet_meta(txn.get(), new_tablet_id, &new_tablet_meta, nullptr);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get new tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
}
}
if (new_tablet_meta.tablet_state() == doris::TabletStatePB::PB_RUNNING) {
code = MetaServiceCode::JOB_ALREADY_SUCCESS;
msg = "schema_change job already success";
return;
}
if (!new_tablet_meta.has_tablet_state() ||
new_tablet_meta.tablet_state() != doris::TabletStatePB::PB_NOTREADY) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "invalid new tablet state";
return;
}
if (!recorded_job.has_schema_change()) {
SS << "there is no running schema_change, tablet_id=" << tablet_id;
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto& recorded_schema_change = recorded_job.schema_change();
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
if (recorded_schema_change.expiration() > 0 && recorded_schema_change.expiration() < now) {
code = MetaServiceCode::JOB_EXPIRED;
SS << "expired schema_change job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(recorded_schema_change);
msg = ss.str();
// FIXME: Just remove or notify to abort?
// LOG(INFO) << "remove expired job, tablet_id=" << tablet_id << " key=" << hex(job_key);
return;
}
// MUST check initiator to let the retried BE commit this schema_change job.
if (schema_change.id() != recorded_schema_change.id() ||
(schema_change.initiator() != recorded_schema_change.initiator() &&
request->action() != FinishTabletJobRequest::ABORT)) {
// abort is from FE, so initiator differ from the original one, just skip this check
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
<< " given_id=" << schema_change.id()
<< " recorded_job=" << proto_to_json(recorded_schema_change)
<< " given_job=" << proto_to_json(schema_change);
code = MetaServiceCode::INVALID_ARGUMENT;
msg = ss.str();
return;
}
if (request->action() != FinishTabletJobRequest::COMMIT &&
request->action() != FinishTabletJobRequest::ABORT) {
SS << "unsupported action, tablet_id=" << tablet_id << " action=" << request->action();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
auto new_tablet_job_key = job_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
std::string new_tablet_job_val;
TabletJobInfoPB new_recorded_job;
TxnErrorCode err = txn->get(new_tablet_job_key, &new_tablet_job_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
SS << "internal error,"
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
} else if (err == TxnErrorCode::TXN_OK) {
if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed new tablet recorded job";
return;
}
}
//==========================================================================
// Abort
//==========================================================================
if (request->action() == FinishTabletJobRequest::ABORT) {
if (schema_change.new_tablet_idx().index_id() ==
recorded_schema_change.new_tablet_idx().index_id() &&
schema_change.new_tablet_idx().tablet_id() ==
recorded_schema_change.new_tablet_idx().tablet_id()) {
// remove schema change
recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
if (!new_tablet_job_val.empty()) {
auto& compactions = *new_recorded_job.mutable_compaction();
auto origin_size = compactions.size();
compactions.erase(
std::remove_if(
compactions.begin(), compactions.end(),
[&](auto& c) {
return c.has_delete_bitmap_lock_initiator() &&
c.delete_bitmap_lock_initiator() ==
schema_change.delete_bitmap_lock_initiator();
}),
compactions.end());
if (compactions.size() < origin_size) {
INSTANCE_LOG(INFO)
<< "remove " << (origin_size - compactions.size())
<< " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
<< " delete_bitmap_lock_initiator="
<< schema_change.delete_bitmap_lock_initiator()
<< " key=" << hex(job_key);
}
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
}
return;
}
//==========================================================================
// Commit
//==========================================================================
//
// 1. update new_tablet meta
// 2. move rowsets [2-alter_version] in new_tablet to recycle
// 3. update new_tablet stats
// 4. change tmp rowset to formal rowset
// 5. remove schema_change job
//
//==========================================================================
// update tablet meta
//==========================================================================
new_tablet_meta.set_tablet_state(doris::TabletStatePB::PB_RUNNING);
new_tablet_meta.set_cumulative_layer_point(schema_change.output_cumulative_point());
new_tablet_meta.SerializeToString(&new_tablet_val);
txn->put(new_tablet_key, new_tablet_val);
if (is_versioned_write) {
std::string versioned_new_tablet_key =
versioned::meta_tablet_key({instance_id, new_tablet_id});
versioned_put(txn.get(), versioned_new_tablet_key, new_tablet_val);
LOG(INFO) << "put versioned new tablet meta, new_tablet_id=" << new_tablet_id
<< " key=" << hex(versioned_new_tablet_key);
}
// process mow table, check lock
if (new_tablet_meta.enable_unique_key_merge_on_write()) {
bool success = check_and_remove_delete_bitmap_update_lock(
code, msg, ss, txn, instance_id, new_table_id, new_tablet_id,
SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, schema_change.delete_bitmap_lock_initiator(),
use_version);
if (!success) {
return;
}
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, new_tablet_id});
txn->remove(pending_key);
LOG(INFO) << "xxx sc remove delete bitmap pending key, pending_key=" << hex(pending_key)
<< " tablet_id=" << new_tablet_id << ", job_id=" << schema_change.id();
}
//==========================================================================
// move rowsets [2-alter_version] to recycle
//==========================================================================
if (!schema_change.has_alter_version()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no alter_version for schema change job, tablet_id=" + std::to_string(tablet_id);
return;
}
schema_change_log.set_alter_version(schema_change.alter_version());
if (schema_change.alter_version() < 2) {
// no need to update stats
if (!new_tablet_job_val.empty()) {
new_recorded_job.clear_schema_change();
auto& compactions = *new_recorded_job.mutable_compaction();
auto origin_size = compactions.size();
compactions.erase(
std::remove_if(compactions.begin(), compactions.end(),
[&](auto& c) {
return c.has_delete_bitmap_lock_initiator() &&
c.delete_bitmap_lock_initiator() ==
schema_change.delete_bitmap_lock_initiator();
}),
compactions.end());
if (compactions.size() < origin_size) {
INSTANCE_LOG(INFO)
<< "remove " << (origin_size - compactions.size())
<< " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
<< " delete_bitmap_lock_initiator="
<< schema_change.delete_bitmap_lock_initiator() << " key=" << hex(job_key);
}
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
need_commit = true;
return;
}
int64_t num_remove_rows = 0;
int64_t size_remove_rowsets = 0;
int64_t num_remove_rowsets = 0;
int64_t num_remove_segments = 0;
int64_t index_size_remove_rowsets = 0;
int64_t segment_size_remove_rowsets = 0;
auto rs_start = meta_rowset_key({instance_id, new_tablet_id, 2});
auto rs_end = meta_rowset_key({instance_id, new_tablet_id, schema_change.alter_version() + 1});
auto handle_schema_change_input_rowset_meta = [&](doris::RowsetMetaCloudPB rs) {
num_remove_rows += rs.num_rows();
size_remove_rowsets += rs.total_disk_size();
++num_remove_rowsets;
num_remove_segments += rs.num_segments();
index_size_remove_rowsets += rs.index_disk_size();
segment_size_remove_rowsets += rs.data_disk_size();
int64_t start_version = rs.start_version(), end_version = rs.end_version();
auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id, rs.rowset_id_v2()});
RecycleRowsetPB recycle_rowset;
recycle_rowset.set_creation_time(now);
recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
if (config::enable_recycle_rowset_strip_key_bounds) {
// Strip key bounds to shrink schema change recycle operation log entries
recycle_rowset.mutable_rowset_meta()->clear_segments_key_bounds();
recycle_rowset.mutable_rowset_meta()->clear_segments_key_bounds_truncated();
}
recycle_rowset.set_type(RecycleRowsetPB::DROP);
if (is_versioned_write) {
schema_change_log.add_recycle_rowsets()->Swap(&recycle_rowset);
} else {
auto recycle_val = recycle_rowset.SerializeAsString();
txn->put(recycle_key, recycle_val);
}
INSTANCE_LOG(INFO) << "put recycle rowset, new_tablet_id=" << new_tablet_id << " version=["
<< start_version << "-" << end_version << "] key=" << hex(recycle_key);
};
if (!is_versioned_read) {
std::tie(code, msg) =
scan_schema_change_input_rowsets(txn.get(), instance_id, new_tablet_id, rs_start,
rs_end, handle_schema_change_input_rowset_meta);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << msg;
return;
}
} else {
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = reader.get_rowset_metas(txn.get(), new_tablet_id, 2,
schema_change.alter_version(), &rowset_metas);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format(
"failed to get rowset metas, new_tablet_id={}, start={}, end={}, err={}",
new_tablet_id, 2, schema_change.alter_version(), err);
LOG(WARNING) << msg;
return;
}
for (auto&& rowset_meta : rowset_metas) {
handle_schema_change_input_rowset_meta(std::move(rowset_meta));
}
}
txn->remove(rs_start, rs_end);
//==========================================================================
// update new_tablet stats
//==========================================================================
auto stats = response->mutable_stats();
TabletStats detached_stats;
if (is_versioned_read) {
TxnErrorCode err = reader.get_tablet_compact_stats(txn.get(), new_tablet_id, stats, nullptr,
config::snapshot_get_tablet_stats);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet compact stats, tablet_id={}, err={}",
new_tablet_id, err);
LOG(WARNING) << msg;
return;
}
TabletStatsPB load_stats;
err = reader.get_tablet_load_stats(txn.get(), new_tablet_id, &load_stats, nullptr,
config::snapshot_get_tablet_stats);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet load stats, tablet_id={}, err={}",
new_tablet_id, err);
LOG(WARNING) << msg;
return;
}
detach_tablet_stats(load_stats, detached_stats);
} else {
// ATTN: The condition that snapshot read can be used to get tablet stats is: all other transactions that put tablet stats
// can make read write conflicts with this transaction on other keys. Currently, if all meta-service nodes are running
// with `config::split_tablet_stats = true` can meet the condition.
internal_get_tablet_stats(code, msg, txn.get(), instance_id, new_tablet_idx, *stats,
detached_stats, config::snapshot_get_tablet_stats);
if (code != MetaServiceCode::OK) {
LOG_WARNING("failed to get tablet stats")
.tag("instance_id", instance_id)
.tag("tablet_id", tablet_id)
.tag("code", code)
.tag("msg", msg);
return;
}
}
schema_change_update_tablet_stats(schema_change, stats, num_remove_rows, size_remove_rowsets,
num_remove_rowsets, num_remove_segments,
index_size_remove_rowsets, segment_size_remove_rowsets);
auto stats_key = stats_tablet_key(
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
auto stats_val = stats->SerializeAsString();
txn->put(stats_key, stats_val);
if (is_versioned_write) {
std::string compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, new_tablet_id});
versioned_put(txn.get(), compact_stats_key, stats_val);
LOG_INFO("put versioned tablet compact stats key")
.tag("tablet_id", tablet_id)
.tag("new_tablet_id", new_tablet_id)
.tag("compact_value_size", stats_val.size())
.tag("compact_stats_key", hex(compact_stats_key))
.tag("instance_id", instance_id);
}
merge_tablet_stats(*stats, detached_stats);
VLOG_DEBUG << "update tablet stats tablet_id=" << tablet_id << " key=" << hex(stats_key)
<< " stats=" << proto_to_json(*stats);
//==========================================================================
// change tmp rowset to formal rowset
//==========================================================================
if (schema_change.txn_ids().empty() || schema_change.output_versions().empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty txn_ids or output_versions";
return;
}
for (size_t i = 0; i < schema_change.txn_ids().size(); ++i) {
auto tmp_rowset_key =
meta_rowset_tmp_key({instance_id, schema_change.txn_ids().at(i), new_tablet_id});
std::string tmp_rowset_val;
// FIXME: async get
TxnErrorCode err = txn->get(tmp_rowset_key, &tmp_rowset_val);
if (err != TxnErrorCode::TXN_OK) {
SS << "failed to get tmp rowset key"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< ", tablet_id=" << new_tablet_id << " tmp_rowset_key=" << hex(tmp_rowset_key)
<< ", err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::UNDEFINED_ERR
: cast_as<ErrCategory::READ>(err);
return;
}
RowsetMetaCloudPB tmp_rowset_meta;
if (!tmp_rowset_meta.ParseFromString(tmp_rowset_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
SS << "malformed tmp rowset meta, unable to deserialize, tablet_id=" << new_tablet_id
<< " key=" << hex(tmp_rowset_key);
msg = ss.str();
return;
}
if (tmp_rowset_meta.has_is_recycled() && tmp_rowset_meta.is_recycled()) {
SS << "rowset has already been marked as recycled, tablet_id=" << new_tablet_id
<< " txn_id=" << tmp_rowset_meta.txn_id()
<< " rowset_id=" << tmp_rowset_meta.rowset_id_v2();
msg = ss.str();
code = MetaServiceCode::TXN_ALREADY_ABORTED;
return;
}
using namespace std::chrono;
auto rowset_visible_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
tmp_rowset_meta.set_visible_ts_ms(rowset_visible_time);
std::string rowset_val;
if (!tmp_rowset_meta.SerializeToString(&rowset_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
SS << "failed to serialize rowset meta, tablet_id=" << new_tablet_id
<< " rowset_id=" << tmp_rowset_meta.rowset_id_v2();
msg = ss.str();
return;
}
auto rowset_key = meta_rowset_key(
{instance_id, new_tablet_id, schema_change.output_versions().at(i)});
txn->put(rowset_key, rowset_val);
txn->remove(tmp_rowset_key);
if (is_versioned_write) {
doris::RowsetMetaCloudPB rs_meta(tmp_rowset_meta);
std::string meta_rowset_compact_key = versioned::meta_rowset_compact_key(
{instance_id, new_tablet_id, rs_meta.end_version()});
// Put versioned rowset compact metadata for new tablet's rowsets
LOG_INFO("put sc versioned meta rowset compact key")
.tag("instance_id", instance_id)
.tag("meta_rowset_compact_key", hex(meta_rowset_compact_key))
.tag("new_tablet_id", new_tablet_id);
if (rs_meta.txn_id() <= 0) {
SS << "invalid txn_id in output tmp rowset meta, new_tablet_id=" << new_tablet_id
<< " txn_id=" << rs_meta.txn_id();
msg = ss.str();
code = MetaServiceCode::INVALID_ARGUMENT;
return;
}
if (!versioned::document_put(txn.get(), meta_rowset_compact_key, std::move(rs_meta))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = fmt::format("failed to serialize versioned rowset meta, key={}",
hex(meta_rowset_compact_key));
return;
}
}
}
//==========================================================================
// remove schema_change job
//==========================================================================
recorded_job.clear_schema_change();
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
if (!new_tablet_job_val.empty()) {
auto& compactions = *new_recorded_job.mutable_compaction();
auto origin_size = compactions.size();
compactions.erase(
std::remove_if(compactions.begin(), compactions.end(),
[&](auto& c) {
return c.has_delete_bitmap_lock_initiator() &&
c.delete_bitmap_lock_initiator() ==
schema_change.delete_bitmap_lock_initiator();
}),
compactions.end());
if (compactions.size() < origin_size) {
INSTANCE_LOG(INFO) << "remove " << (origin_size - compactions.size())
<< " STOP_TOKEN for schema_change job tablet_id=" << tablet_id
<< " delete_bitmap_lock_initiator="
<< schema_change.delete_bitmap_lock_initiator()
<< " key=" << hex(job_key);
}
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
}
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
<< " key=" << hex(job_key);
need_commit = true;
if (is_versioned_write) {
std::string operation_log_key = versioned::log_key({instance_id});
OperationLogPB operation_log;
if (is_versioned_read) {
operation_log.set_min_timestamp(reader.min_read_version());
}
operation_log.mutable_schema_change()->Swap(&schema_change_log);
versioned::blob_put(txn.get(), operation_log_key, operation_log);
LOG_INFO("put schema change operation log key")
.tag("instance_id", instance_id)
.tag("operation_log_key", hex(operation_log_key))
.tag("tablet_id", tablet_id)
.tag("new_tablet_id", new_tablet_id)
.tag("recycle_rowsets_count", schema_change_log.recycle_rowsets().size());
}
}
void _finish_tablet_job(const FinishTabletJobRequest* request, FinishTabletJobResponse* response,
std::string& instance_id, std::unique_ptr<Transaction>& txn, TxnKv* txn_kv,
DeleteBitmapLockWhiteList* delete_bitmap_lock_white_list,
ResourceManager* resource_mgr, MetaServiceCode& code, std::string& msg,
std::stringstream& ss) {
bool is_versioned_read = resource_mgr->is_version_read_enabled(instance_id);
bool is_versioned_write = resource_mgr->is_version_write_enabled(instance_id);
for (int retry = 0; retry <= 1; retry++) {
bool need_commit = false;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
int64_t tablet_id = request->job().idx().tablet_id();
if (tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid tablet_id given";
return;
}
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
if (!is_versioned_read) {
get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) return;
} else {
CloneChainReader reader(instance_id, resource_mgr);
err = reader.get_tablet_index(txn.get(), tablet_id, &tablet_idx);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet index, tablet_id={}, err={}", tablet_id,
err);
LOG(WARNING) << msg;
return;
}
}
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
tablet_idx.partition_id())) {
code = MetaServiceCode::TABLET_NOT_FOUND;
msg = fmt::format("tablet {} has been dropped", tablet_id);
return;
}
// TODO(gavin): remove duplicated code with start_tablet_job()
// Begin to process finish tablet job
std::string job_key =
job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id});
std::string job_val;
err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK) {
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
}
TabletJobInfoPB recorded_job;
recorded_job.ParseFromString(job_val);
VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(recorded_job);
FinishTabletJobRequest_Action action = request->action();
std::string use_version =
delete_bitmap_lock_white_list->get_delete_bitmap_lock_version(instance_id);
LOG(INFO) << "finish_tablet_job instance_id=" << instance_id
<< " use_version=" << use_version;
if (!request->job().compaction().empty()) {
// Process compaction commit
process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit, use_version, is_versioned_read,
is_versioned_write, txn_kv, resource_mgr);
} else if (request->job().has_schema_change()) {
// Process schema change commit
process_schema_change_job(code, msg, ss, txn, request, response, recorded_job,
instance_id, job_key, need_commit, use_version,
is_versioned_read, is_versioned_write, txn_kv, resource_mgr);
}
if (!need_commit) return;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
if (action == FinishTabletJobRequest::COMMIT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_compaction_commit_counter << 1;
} else if (action == FinishTabletJobRequest::LEASE) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_compaction_lease_counter << 1;
} else if (action == FinishTabletJobRequest::ABORT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_compaction_abort_counter << 1;
}
if (retry == 0 && !request->job().compaction().empty() &&
request->job().compaction(0).has_delete_bitmap_lock_initiator()) {
// Do a fast retry for mow when commit compaction job.
// The only fdb txn conflict here is that during the compaction job commit,
// a compaction lease RPC comes and finishes before the commit,
// so we retry to commit the compaction job again.
response->Clear();
code = MetaServiceCode::OK;
msg.clear();
continue;
}
}
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit job kv, err=" << err;
msg = ss.str();
return;
}
break;
}
}
void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* controller,
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(finish_tablet_job, get, put, del);
std::string cloud_unique_id = request->cloud_unique_id();
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
SS << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
LOG(INFO) << msg;
return;
}
RPC_RATE_LIMIT(finish_tablet_job)
if (!request->has_job() ||
(request->job().compaction().empty() && !request->job().has_schema_change())) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid job specified";
return;
}
_finish_tablet_job(request, response, instance_id, txn, txn_kv_.get(),
delete_bitmap_lock_white_list_.get(), resource_mgr_.get(), code, msg, ss);
}
#undef SS
#undef INSTANCE_LOG
} // namespace doris::cloud