blob: 0ada3eca56d1432e7c7974fe7777e06d9763d12c [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "blob_message.h"
#include <butil/iobuf.h>
#include <butil/iobuf_inl.h>
#include <google/protobuf/message.h>
#include "common/logging.h"
#include "common/util.h"
#include "meta-store/codec.h"
#include "meta-store/txn_kv.h"
namespace doris::cloud {
static std::vector<std::string_view> split_string(const std::string_view& str, int n) {
std::vector<std::string_view> substrings;
for (size_t i = 0; i < str.size(); i += n) {
substrings.push_back(str.substr(i, n));
}
return substrings;
}
bool ValueBuf::to_pb(google::protobuf::Message* pb) const {
butil::IOBuf merge;
for (auto&& it : iters) {
it->reset();
while (it->has_next()) {
auto [k, v] = it->next();
merge.append_user_data((void*)v.data(), v.size(), +[](void*) {});
}
}
butil::IOBufAsZeroCopyInputStream merge_stream(merge);
return pb->ParseFromZeroCopyStream(&merge_stream);
}
std::string ValueBuf::value() const {
butil::IOBuf merge;
for (auto&& it : iters) {
it->reset();
while (it->has_next()) {
auto [k, v] = it->next();
merge.append_user_data((void*)v.data(), v.size(), +[](void*) {});
}
}
return merge.to_string();
}
std::vector<std::string> ValueBuf::keys() const {
std::vector<std::string> ret;
for (auto&& it : iters) {
it->reset();
while (it->has_next()) {
auto [k, _] = it->next();
ret.emplace_back(k.data(), k.size());
}
}
return ret;
}
void ValueBuf::remove(Transaction* txn) const {
for (auto&& it : iters) {
it->reset();
while (it->has_next()) {
txn->remove(it->next().first);
}
}
}
TxnErrorCode ValueBuf::get(Transaction* txn, std::string_view key, bool snapshot) {
iters.clear();
ver = -1;
std::string begin_key {key};
std::string end_key {key};
encode_int64(INT64_MAX, &end_key);
std::unique_ptr<RangeGetIterator> it;
TxnErrorCode err = txn->get(begin_key, end_key, &it, snapshot);
if (err != TxnErrorCode::TXN_OK) {
return err;
}
if (!it->has_next()) {
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
// Extract version
auto [k, _] = it->next();
if (k.size() == key.size()) { // Old version KV
DCHECK(k == key) << hex(k) << ' ' << hex(key);
DCHECK_EQ(it->size(), 1) << hex(k) << ' ' << hex(key);
ver = 0;
} else {
k.remove_prefix(key.size());
int64_t suffix;
if (decode_int64(&k, &suffix) != 0) [[unlikely]] {
LOG_WARNING("failed to decode key").tag("key", hex(k));
return TxnErrorCode::TXN_INVALID_DATA;
}
ver = suffix >> 56 & 0xff;
}
bool more = it->more();
if (!more) {
iters.push_back(std::move(it));
return TxnErrorCode::TXN_OK;
}
begin_key = it->next_begin_key();
iters.push_back(std::move(it));
do {
err = txn->get(begin_key, end_key, &it, snapshot);
if (err != TxnErrorCode::TXN_OK) {
return err;
}
more = it->more();
if (more) {
begin_key = it->next_begin_key();
}
iters.push_back(std::move(it));
} while (more);
return TxnErrorCode::TXN_OK;
}
TxnErrorCode blob_get(Transaction* txn, std::string_view key, ValueBuf* val, bool snapshot) {
return val->get(txn, key, snapshot);
}
void blob_put(Transaction* txn, std::string_view key, const google::protobuf::Message& pb,
uint8_t ver, size_t split_size) {
std::string value;
bool ret = pb.SerializeToString(&value); // Always success
DCHECK(ret) << hex(key) << ' ' << pb.ShortDebugString();
blob_put(txn, key, value, ver, split_size);
}
void blob_put(Transaction* txn, std::string_view key, std::string_view value, uint8_t ver,
size_t split_size) {
auto split_vec = split_string(value, split_size);
int64_t suffix_base = ver;
suffix_base <<= 56;
for (size_t i = 0; i < split_vec.size(); ++i) {
std::string k(key);
encode_int64(suffix_base + i, &k);
txn->put(k, split_vec[i]);
}
}
} // namespace doris::cloud