blob: 454eb1ac643af9064c5d15d01488d89f18bd8b1b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/rowset_meta_manager.h"
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
#include <boost/algorithm/string/trim.hpp>
#include <fstream>
#include <functional>
#include <memory>
#include <new>
#include <string>
#include <string_view>
#include <vector>
#include "common/logging.h"
#include "olap/binlog.h"
#include "olap/olap_define.h"
#include "olap/olap_meta.h"
#include "olap/utils.h"
#include "util/debug_points.h"
namespace doris {
using namespace ErrorCode;
bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
const RowsetId& rowset_id) {
std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
std::string value;
return meta->key_may_exist(META_COLUMN_FAMILY_INDEX, key, &value);
}
Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
std::string value;
return meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
}
Status RowsetMetaManager::get_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
const RowsetId& rowset_id,
RowsetMetaSharedPtr rowset_meta) {
std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
std::string value;
Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
if (s.is<META_KEY_NOT_FOUND>()) {
return Status::Error<META_KEY_NOT_FOUND>("rowset id: {} not found.", key);
} else if (!s.ok()) {
return Status::Error<IO_ERROR>("load rowset id: {} failed.", key);
}
bool ret = rowset_meta->init(value);
if (!ret) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>("parse rowset meta failed. rowset id: {}",
key);
}
return Status::OK();
}
Status RowsetMetaManager::get_json_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
const RowsetId& rowset_id,
std::string* json_rowset_meta) {
RowsetMetaSharedPtr rowset_meta_ptr(new (std::nothrow) RowsetMeta());
Status status = get_rowset_meta(meta, tablet_uid, rowset_id, rowset_meta_ptr);
if (!status.ok()) {
return status;
}
bool ret = rowset_meta_ptr->json_rowset_meta(json_rowset_meta);
if (!ret) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>("get json rowset meta failed. rowset id:{}",
rowset_id.to_string());
}
return Status::OK();
}
Status RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb, bool enable_binlog) {
if (rowset_meta_pb.partition_id() <= 0) {
LOG(WARNING) << "invalid partition id " << rowset_meta_pb.partition_id() << " tablet "
<< rowset_meta_pb.tablet_id();
// TODO(dx): after fix partition id eq 0 bug, fix it
// return Status::InternalError("invaid partition id {} tablet {}",
// rowset_meta_pb.partition_id(), rowset_meta_pb.tablet_id());
}
DBUG_EXECUTE_IF("RowsetMetaManager::save::zero_partition_id", {
long partition_id = rowset_meta_pb.partition_id();
auto& rs_pb = const_cast<std::decay_t<decltype(rowset_meta_pb)>&>(rowset_meta_pb);
rs_pb.set_partition_id(0);
LOG(WARNING) << "set debug point RowsetMetaManager::save::zero_partition_id old="
<< partition_id << " new=" << rowset_meta_pb.DebugString();
});
if (enable_binlog) {
return _save_with_binlog(meta, tablet_uid, rowset_id, rowset_meta_pb);
} else {
return _save(meta, tablet_uid, rowset_id, rowset_meta_pb);
}
}
Status RowsetMetaManager::_save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb) {
std::string key =
fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string());
std::string value;
if (!rowset_meta_pb.SerializeToString(&value)) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}",
key);
}
return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
}
Status RowsetMetaManager::_save_with_binlog(OlapMeta* meta, TabletUid tablet_uid,
const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb) {
// create rowset write data
std::string rowset_key =
fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string());
std::string rowset_value;
if (!rowset_meta_pb.SerializeToString(&rowset_value)) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}",
rowset_key);
}
// create binlog write data
// binlog_meta_key format: {kBinlogPrefix}meta_{tablet_uid}_{version}_{rowset_id}
// binlog_data_key format: {kBinlogPrefix}data_{tablet_uid}_{version}_{rowset_id}
// version is formatted to 20 bytes to avoid the problem of sorting, version is lower, timestamp is lower
// binlog key is not supported for cumulative rowset
if (rowset_meta_pb.start_version() != rowset_meta_pb.end_version()) {
return Status::Error<ROWSET_BINLOG_NOT_ONLY_ONE_VERSION>(
"binlog key is not supported for cumulative rowset. rowset id:{}", rowset_key);
}
auto version = rowset_meta_pb.start_version();
std::string binlog_meta_key = make_binlog_meta_key(tablet_uid, version, rowset_id);
std::string binlog_data_key = make_binlog_data_key(tablet_uid, version, rowset_id);
BinlogMetaEntryPB binlog_meta_entry_pb;
binlog_meta_entry_pb.set_version(version);
binlog_meta_entry_pb.set_tablet_id(rowset_meta_pb.tablet_id());
binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id());
binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments());
binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time());
binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2());
std::string binlog_meta_value;
if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize binlog pb failed. rowset id:{}",
binlog_meta_key);
}
// create batch entries
std::vector<OlapMeta::BatchEntry> entries = {
{std::cref(rowset_key), std::cref(rowset_value)},
{std::cref(binlog_meta_key), std::cref(binlog_meta_value)},
{std::cref(binlog_data_key), std::cref(rowset_value)}};
return meta->put(META_COLUMN_FAMILY_INDEX, entries);
}
std::vector<std::string> RowsetMetaManager::get_binlog_filenames(OlapMeta* meta,
TabletUid tablet_uid,
std::string_view binlog_version,
int64_t segment_idx) {
auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version);
VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key);
std::vector<std::string> binlog_files;
std::string rowset_id;
int64_t num_segments = -1;
auto traverse_func = [&rowset_id, &num_segments](std::string_view key,
std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
// key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
// check starts with "binlog_meta_"
if (!starts_with_binlog_meta(key)) {
LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
return false;
}
if (auto pos = key.rfind('_'); pos == std::string::npos) {
LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
return false;
} else {
rowset_id = key.substr(pos + 1);
}
BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
LOG(WARNING) << fmt::format("invalid binlog meta value:{}", value);
return false;
}
num_segments = binlog_meta_entry_pb.num_segments();
return false;
};
// get binlog meta by prefix
Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
if (!status.ok() || rowset_id.empty() || num_segments < 0) {
LOG(WARNING) << fmt::format(
"fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, "
"rowset_id:{}, num_segments:{}",
tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id,
num_segments);
}
// construct binlog_files list
if (segment_idx >= num_segments) {
LOG(WARNING) << fmt::format("invalid segment idx:{}, num_segments:{}", segment_idx,
num_segments);
return binlog_files;
}
for (int64_t i = 0; i < num_segments; ++i) {
// TODO(Drogon): Update to filesystem path
auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
binlog_files.emplace_back(std::move(segment_file));
}
return binlog_files;
}
std::pair<std::string, int64_t> RowsetMetaManager::get_binlog_info(
OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version) {
VLOG_DEBUG << fmt::format("tablet_uid:{}, binlog_version:{}", tablet_uid.to_string(),
binlog_version);
auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version);
VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key);
std::string rowset_id;
int64_t num_segments = -1;
auto traverse_func = [&rowset_id, &num_segments](std::string_view key,
std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
// key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
auto pos = key.rfind('_');
if (pos == std::string::npos) {
LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
return false;
}
rowset_id = key.substr(pos + 1);
BinlogMetaEntryPB binlog_meta_entry_pb;
binlog_meta_entry_pb.ParseFromArray(value.data(), value.size());
num_segments = binlog_meta_entry_pb.num_segments();
return false;
};
// get binlog meta by prefix
Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
if (!status.ok() || rowset_id.empty() || num_segments < 0) {
LOG(WARNING) << fmt::format(
"fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, "
"rowset_id:{}, num_segments:{}",
tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id,
num_segments);
}
return std::make_pair(rowset_id, num_segments);
}
std::string RowsetMetaManager::get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid,
std::string_view binlog_version,
std::string_view rowset_id) {
auto binlog_data_key = make_binlog_data_key(tablet_uid.to_string(), binlog_version, rowset_id);
VLOG_DEBUG << fmt::format("get binlog_meta_key:{}", binlog_data_key);
std::string binlog_meta_value;
Status status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_meta_value);
if (!status.ok()) {
LOG(WARNING) << fmt::format(
"fail to get binlog meta. tablet uid:{}, binlog version:{}, "
"rowset_id:{}, status:{}",
tablet_uid.to_string(), binlog_version, rowset_id, status.to_string());
return "";
}
return binlog_meta_value;
}
Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb) {
if (binlog_versions.empty()) {
return _get_all_rowset_binlog_metas(meta, tablet_uid, metas_pb);
} else {
return _get_rowset_binlog_metas(meta, tablet_uid, binlog_versions, metas_pb);
}
}
Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str](
std::string_view key, std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
if (!starts_with_binlog_meta(key)) {
auto err_msg = fmt::format("invalid binlog meta key:{}", key);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}
BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}
auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(std::string {key});
binlog_meta_pb->set_meta(std::string {value});
auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.ok()) {
LOG(WARNING) << status.to_string();
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);
return false;
};
for (auto& binlog_version : binlog_versions) {
auto prefix_key = make_binlog_meta_key_prefix(tablet_uid, binlog_version);
Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
prefix_key, iterStatus.to_string());
return iterStatus;
}
if (!status.ok()) {
return status;
}
}
return status;
}
Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
Version version, RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
std::string_view key, std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value);
if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable.
// All binlog meta has been scanned
return false;
}
if (!starts_with_binlog_meta(key)) {
auto err_msg = fmt::format("invalid binlog meta key:{}", key);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}
BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}
const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(std::string {key});
binlog_meta_pb->set_meta(std::string {value});
auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.ok()) {
LOG(WARNING) << status.to_string();
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);
return true;
};
Status iterStatus =
meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format(
"fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key,
version.to_string(), iterStatus.to_string());
return iterStatus;
}
return status;
}
Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
int64_t tablet_id = 0;
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &tablet_id](
std::string_view key, std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
if (!starts_with_binlog_meta(key)) {
LOG(INFO) << fmt::format("end scan binlog meta. key:{}", key);
return false;
}
BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}
if (tablet_id == 0) {
tablet_id = binlog_meta_entry_pb.tablet_id();
} else if (tablet_id != binlog_meta_entry_pb.tablet_id()) {
// scan all binlog meta, so tablet_id should be same:
return false;
}
auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(std::string {key});
binlog_meta_pb->set_meta(std::string {value});
auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.ok()) {
LOG(WARNING) << status;
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);
return true;
};
auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
prefix_key, iterStatus.to_string());
return iterStatus;
}
return status;
}
Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
VLOG_NOTICE << "start to remove rowset, key:" << key;
Status status = meta->remove(META_COLUMN_FAMILY_INDEX, key);
VLOG_NOTICE << "remove rowset key:" << key << " finished";
return status;
}
Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffix) {
// Please do not remove std::vector<std::string>, more info refer to pr#23190
return meta->remove(META_COLUMN_FAMILY_INDEX,
std::vector<std::string> {kBinlogMetaPrefix.data() + suffix,
kBinlogDataPrefix.data() + suffix});
}
Status RowsetMetaManager::ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb) {
std::vector<OlapMeta::BatchEntry> entries;
const auto tablet_uid_str = tablet_uid.to_string();
for (auto& rowset_binlog_meta : *metas_pb->mutable_rowset_binlog_metas()) {
auto& rowset_id = rowset_binlog_meta.rowset_id();
auto version = rowset_binlog_meta.version();
auto meta_key = rowset_binlog_meta.mutable_meta_key();
*meta_key = make_binlog_meta_key(tablet_uid_str, version, rowset_id);
auto data_key = rowset_binlog_meta.mutable_data_key();
*data_key = make_binlog_data_key(tablet_uid_str, version, rowset_id);
entries.emplace_back(*meta_key, rowset_binlog_meta.meta());
entries.emplace_back(*data_key, rowset_binlog_meta.data());
}
return meta->put(META_COLUMN_FAMILY_INDEX, entries);
}
Status RowsetMetaManager::traverse_rowset_metas(
OlapMeta* meta,
std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func) {
auto traverse_rowset_meta_func = [&func](std::string_view key, std::string_view value) -> bool {
std::vector<std::string> parts;
// key format: rst_uuid_rowset_id
RETURN_IF_ERROR(split_string(key, '_', &parts));
if (parts.size() != 3) {
LOG(WARNING) << "invalid rowset key:" << key << ", splitted size:" << parts.size();
return true;
}
RowsetId rowset_id;
rowset_id.init(parts[2]);
std::vector<std::string> uid_parts;
RETURN_IF_ERROR(split_string(parts[1], '-', &uid_parts));
TabletUid tablet_uid(uid_parts[0], uid_parts[1]);
return func(tablet_uid, rowset_id, value);
};
Status status =
meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func);
return status;
}
Status RowsetMetaManager::traverse_binlog_metas(
OlapMeta* meta,
std::function<bool(std::string_view, std::string_view, bool)> const& collector) {
std::pair<std::string, bool> last_info = std::make_pair(kBinlogMetaPrefix.data(), false);
bool seek_found = false;
Status status;
auto traverse_binlog_meta_func = [&last_info, &seek_found, &collector](
std::string_view key, std::string_view value) -> bool {
seek_found = true;
auto& [last_prefix, need_collect] = last_info;
size_t pos = key.find('_', kBinlogMetaPrefix.size());
if (pos == std::string::npos) {
LOG(WARNING) << "invalid binlog meta key: " << key;
return true;
}
std::string_view key_view(key.data(), pos);
std::string_view last_prefix_view(last_prefix.data(), last_prefix.size() - 1);
if (last_prefix_view != key_view) {
need_collect = collector(key, value, true);
last_prefix = std::string(key_view) + "~";
} else if (need_collect) {
collector(key, value, false);
}
return need_collect;
};
do {
seek_found = false;
status = meta->iterate(META_COLUMN_FAMILY_INDEX, last_info.first, kBinlogMetaPrefix.data(),
traverse_binlog_meta_func);
} while (status.ok() && seek_found);
return status;
}
Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta,
const std::string& rowset_meta_path) {
std::ifstream infile(rowset_meta_path);
char buffer[1024];
std::string json_rowset_meta;
while (!infile.eof()) {
infile.getline(buffer, 1024);
json_rowset_meta = json_rowset_meta + buffer;
}
boost::algorithm::trim(json_rowset_meta);
RowsetMeta rowset_meta;
bool ret = rowset_meta.init_from_json(json_rowset_meta);
if (!ret) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>("parse json rowset meta failed.");
}
RowsetId rowset_id = rowset_meta.rowset_id();
TabletUid tablet_uid = rowset_meta.tablet_uid();
Status status = save(meta, tablet_uid, rowset_id, rowset_meta.get_rowset_pb(), false);
return status;
}
Status RowsetMetaManager::save_partial_update_info(
OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id,
const PartialUpdateInfoPB& partial_update_info_pb) {
std::string key =
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
std::string value;
if (!partial_update_info_pb.SerializeToString(&value)) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
"serialize partial update info failed. key={}", key);
}
VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size();
return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
}
Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
int64_t partition_id, int64_t txn_id,
PartialUpdateInfoPB* partial_update_info_pb) {
std::string key =
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
std::string value;
Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
if (status.is<META_KEY_NOT_FOUND>()) {
return status;
}
if (!status.ok()) {
LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}",
tablet_id, partition_id, txn_id);
return status;
}
if (!partial_update_info_pb->ParseFromString(value)) {
return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
"fail to parse partial update info content to protobuf object. tablet_id={}, "
"partition_id={}, txn_id={}",
tablet_id, partition_id, txn_id);
}
return Status::OK();
}
Status RowsetMetaManager::traverse_partial_update_info(
OlapMeta* meta,
std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) {
auto traverse_partial_update_info_func = [&func](std::string_view key,
std::string_view value) -> bool {
std::vector<std::string> parts;
// key format: pui_{tablet_id}_{partition_id}_{txn_id}
RETURN_IF_ERROR(split_string(key, '_', &parts));
if (parts.size() != 4) {
LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size());
return true;
}
int64_t tablet_id = std::stoll(parts[1]);
int64_t partition_id = std::stoll(parts[2]);
int64_t txn_id = std::stoll(parts[3]);
return func(tablet_id, partition_id, txn_id, value);
};
return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
traverse_partial_update_info_func);
}
Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
int64_t partition_id, int64_t txn_id) {
std::string key =
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
VLOG_NOTICE << "remove partial update info, key=" << key;
return res;
}
Status RowsetMetaManager::remove_partial_update_infos(
OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys) {
std::vector<std::string> remove_keys;
for (auto [tablet_id, partition_id, txn_id] : keys) {
remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
partition_id, txn_id));
}
Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size();
return res;
}
Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta,
int64_t tablet_id) {
std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id);
std::vector<std::string> remove_keys;
auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool {
remove_keys.emplace_back(key);
return true;
};
VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id
<< " removed keys size: " << remove_keys.size();
RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func));
return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
}
} // namespace doris