blob: 497f5c57a7022d6e0c79c5d4bf20324723bdc73b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "txn_kv.h"
#include <bthread/countdown_event.h>
#include <byteswap.h>
#include <foundationdb/fdb_c_types.h>
#include <algorithm>
#include <atomic>
#include <bit>
#include <cstring>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <thread>
#include <variant>
#include <vector>
#include "common/bvars.h"
#include "common/config.h"
#include "common/defer.h"
#include "common/lexical_util.h"
#include "common/logging.h"
#include "common/stopwatch.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv_error.h"
// =============================================================================
// FoundationDB implementation of TxnKv
// =============================================================================
#define RETURN_IF_ERROR(op) \
do { \
TxnErrorCode code = op; \
if (code != TxnErrorCode::TXN_OK) return code; \
} while (false)
namespace doris::cloud {
static void may_logging_single_version_reading(std::string_view key) {
if (!config::enable_logging_for_single_version_reading) {
return;
}
if (!key.starts_with(CLOUD_USER_KEY_SPACE01)) {
return;
}
std::vector<std::string> single_version_meta_key_prefixs =
get_single_version_meta_key_prefixs();
if (std::none_of(single_version_meta_key_prefixs.begin(), single_version_meta_key_prefixs.end(),
[&key](const std::string& prefix) { return key.starts_with(prefix); })) {
return;
}
std::string_view tmp_key(key);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
tmp_key.remove_prefix(1); // Remove the space tag.
if (decode_key(&tmp_key, &out) != 0) {
LOG(WARNING) << "Read single version meta key: " << hex(key);
} else {
if (out.size() > 3) {
auto& first = std::get<0>(out[0]);
auto& third = std::get<0>(out[2]);
if (std::holds_alternative<std::string>(first) &&
std::get<std::string>(first) == "meta" &&
std::holds_alternative<std::string>(third)) {
auto& meta_subtype = std::get<std::string>(third);
for (auto&& v :
{"rowset_tmp", "delete_bitmap_lock", "delete_bitmap_pending", "schema"}) {
if (meta_subtype == v) {
return;
}
}
}
}
std::string value;
for (auto& [v, tag, pos] : out) {
if (std::holds_alternative<std::string>(v)) {
value.append(std::get<std::string>(v));
} else {
value.append(std::to_string(std::get<int64_t>(v)));
}
value.push_back(' ');
}
LOG(WARNING) << "Read single version meta key: \\x01 " << value << ", raw: " << hex(key);
}
}
static std::tuple<fdb_bool_t, int> apply_key_selector(RangeKeySelector selector) {
// Keep consistent with FDB_KEYSEL_* constants.
switch (selector) {
case RangeKeySelector::FIRST_GREATER_OR_EQUAL:
return {0, 1};
case RangeKeySelector::FIRST_GREATER_THAN:
return {1, 1};
case RangeKeySelector::LAST_LESS_OR_EQUAL:
return {1, 0};
case RangeKeySelector::LAST_LESS_THAN:
return {0, 0};
}
LOG(FATAL) << "Unknown RangeKeySelector: " << static_cast<int>(selector);
return {0, 0};
}
int FdbTxnKv::init() {
network_ = std::make_shared<fdb::Network>();
int ret = network_->init();
if (ret != 0) {
LOG(WARNING) << "failed to init network";
return ret;
}
database_ = std::make_shared<fdb::Database>(network_, config::fdb_cluster_file_path,
FDBDatabaseOption {});
ret = database_->init();
if (ret != 0) {
LOG(WARNING) << "failed to init database";
return ret;
}
// Access the database to ensure the cluster file is valid, and eat the first cluster_version_changed error if any.
while (true) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "init fdb txn kv failed, create txn: " << err;
return -1;
}
std::string status_json_key = "\xff\xff/status/json";
std::string value;
err = txn->get(status_json_key, &value, true);
if (err == TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED) {
continue;
}
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "init fdb txn kv failed, get status json key: " << err;
return -1;
}
LOG(INFO) << "fdb txn kv is initialized";
return 0;
}
}
TxnErrorCode FdbTxnKv::create_txn(std::unique_ptr<Transaction>* txn) {
auto* t = new fdb::Transaction(database_);
txn->reset(t);
auto ret = t->init();
if (ret != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to init txn, ret=" << ret;
}
return ret;
}
TxnErrorCode FdbTxnKv::create_txn_with_system_access(std::unique_ptr<Transaction>* txn) {
auto t = std::make_unique<fdb::Transaction>(database_);
TxnErrorCode code = t->init();
if (code == TxnErrorCode::TXN_OK) {
code = t->enable_access_system_keys();
}
if (code != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to init txn, ret=" << code;
return code;
}
*txn = std::move(t);
return TxnErrorCode::TXN_OK;
}
std::unique_ptr<FullRangeGetIterator> FdbTxnKv::full_range_get(std::string begin, std::string end,
FullRangeGetOptions opts) {
return std::make_unique<fdb::FullRangeGetIterator>(std::move(begin), std::move(end),
std::move(opts));
}
TxnErrorCode FdbTxnKv::get_partition_boundaries(std::vector<std::string>* boundaries) {
boundaries->clear();
std::unique_ptr<Transaction> txn;
TxnErrorCode code = create_txn_with_system_access(&txn);
if (code != TxnErrorCode::TXN_OK) {
return code;
}
std::string begin_key(fdb_partition_key_prefix());
std::string end_key(fdb_partition_key_end());
RangeGetOptions opts;
opts.snapshot = true;
std::unique_ptr<RangeGetIterator> iter;
int num_iterations = 0;
int num_kvs = 0;
while (iter == nullptr /* may be not init */ || iter->more()) {
code = txn->get(begin_key, end_key, &iter, opts);
if (code != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to get fdb boundaries")
.tag("code", code)
.tag("begin_key", hex(begin_key))
.tag("end_key", hex(end_key))
.tag("num_iterations", num_iterations)
.tag("num_kvs", num_kvs);
if (code == TxnErrorCode::TXN_TOO_OLD) {
code = create_txn_with_system_access(&txn);
if (code == TxnErrorCode::TXN_OK) {
continue;
}
}
LOG_WARNING("failed to recreate txn when get fdb boundaries").tag("code", code);
return code;
}
while (iter->has_next()) {
auto&& [key, value] = iter->next();
boundaries->emplace_back(key);
++num_kvs;
}
begin_key = iter->next_begin_key();
++num_iterations;
}
return TxnErrorCode::TXN_OK;
}
double FdbTxnKv::get_client_thread_busyness() const {
return fdb_database_get_main_thread_busyness(database_->db());
}
TxnErrorCode Transaction::batch_scan(
std::vector<std::optional<std::pair<std::string, std::string>>>* res,
const std::vector<std::string>& key_prefixs, const BatchGetOptions& opts) {
std::vector<std::pair<std::string, std::string>> ranges;
ranges.reserve(key_prefixs.size());
for (auto&& key_prefix : key_prefixs) {
ranges.emplace_back(key_prefix, lexical_end(key_prefix));
}
return batch_scan(res, ranges, opts);
}
} // namespace doris::cloud
namespace doris::cloud::fdb {
// https://apple.github.io/foundationdb/known-limitations.html#design-limitations
constexpr size_t FDB_VALUE_BYTES_LIMIT = 100'000; // 100 KB
// Ref https://apple.github.io/foundationdb/api-error-codes.html#developer-guide-error-codes.
constexpr fdb_error_t FDB_ERROR_CODE_TIMED_OUT = 1004;
constexpr fdb_error_t FDB_ERROR_CODE_TXN_TOO_OLD = 1007;
constexpr fdb_error_t FDB_ERROR_CODE_TXN_CONFLICT = 1020;
constexpr fdb_error_t FDB_ERROR_COMMIT_UNKNOWN_RESULT = 1021;
constexpr fdb_error_t FDB_ERROR_CODE_TXN_TIMED_OUT = 1031;
constexpr fdb_error_t FDB_ERROR_CODE_TOO_MANY_WATCHES = 1032;
constexpr fdb_error_t FDB_ERROR_CODE_CLUSTER_VERSION_CHANGED = 1039;
constexpr fdb_error_t FDB_ERROR_CODE_INVALID_OPTION_VALUE = 2006;
constexpr fdb_error_t FDB_ERROR_CODE_INVALID_OPTION = 2007;
constexpr fdb_error_t FDB_ERROR_CODE_VERSION_INVALID = 2011;
constexpr fdb_error_t FDB_ERROR_CODE_RANGE_LIMITS_INVALID = 2012;
constexpr fdb_error_t FDB_ERROR_CODE_TXN_TOO_LARGE = 2101;
constexpr fdb_error_t FDB_ERROR_CODE_KEY_TOO_LARGE = 2102;
constexpr fdb_error_t FDB_ERROR_CODE_VALUE_TOO_LARGE = 2103;
constexpr fdb_error_t FDB_ERROR_CODE_CONNECTION_STRING_INVALID = 2104;
static bool fdb_error_is_txn_conflict(fdb_error_t err) {
return err == FDB_ERROR_CODE_TXN_CONFLICT;
}
static TxnErrorCode cast_as_txn_code(fdb_error_t err) {
switch (err) {
case 0:
return TxnErrorCode::TXN_OK;
case FDB_ERROR_CODE_INVALID_OPTION:
case FDB_ERROR_CODE_INVALID_OPTION_VALUE:
case FDB_ERROR_CODE_VERSION_INVALID:
case FDB_ERROR_CODE_RANGE_LIMITS_INVALID:
case FDB_ERROR_CODE_CONNECTION_STRING_INVALID:
return TxnErrorCode::TXN_INVALID_ARGUMENT;
case FDB_ERROR_CODE_TXN_TOO_LARGE:
return TxnErrorCode::TXN_BYTES_TOO_LARGE;
case FDB_ERROR_CODE_KEY_TOO_LARGE:
return TxnErrorCode::TXN_KEY_TOO_LARGE;
case FDB_ERROR_CODE_VALUE_TOO_LARGE:
return TxnErrorCode::TXN_VALUE_TOO_LARGE;
case FDB_ERROR_CODE_TIMED_OUT:
case FDB_ERROR_CODE_TXN_TIMED_OUT:
return TxnErrorCode::TXN_TIMEOUT;
case FDB_ERROR_CODE_TXN_TOO_OLD:
return TxnErrorCode::TXN_TOO_OLD;
case FDB_ERROR_CODE_TXN_CONFLICT:
return TxnErrorCode::TXN_CONFLICT;
case FDB_ERROR_CODE_TOO_MANY_WATCHES:
return TxnErrorCode::TXN_TOO_MANY_WATCHES;
case FDB_ERROR_CODE_CLUSTER_VERSION_CHANGED:
return TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED;
}
if (fdb_error_predicate(FDB_ERROR_PREDICATE_MAYBE_COMMITTED, err)) {
return TxnErrorCode::TXN_MAYBE_COMMITTED;
}
if (fdb_error_predicate(FDB_ERROR_PREDICATE_RETRYABLE_NOT_COMMITTED, err)) {
return TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED;
}
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
// =============================================================================
// Impl of Network
// =============================================================================
decltype(Network::working) Network::working {false};
int Network::init() {
// Globally once
bool expected = false;
if (!Network::working.compare_exchange_strong(expected, true)) return 1;
fdb_error_t err = fdb_select_api_version(fdb_get_max_api_version());
if (err) {
LOG(WARNING) << "failed to select api version, max api version: "
<< fdb_get_max_api_version() << ", err: " << fdb_get_error(err);
return 1;
}
LOG(INFO) << "select fdb api version: " << fdb_get_max_api_version();
// Setup network thread
if (config::enable_fdb_external_client_directory &&
!config::fdb_external_client_directory.empty()) {
err = fdb_network_set_option(FDB_NET_OPTION_EXTERNAL_CLIENT_DIRECTORY,
(const uint8_t*)config::fdb_external_client_directory.c_str(),
config::fdb_external_client_directory.size());
if (err) {
LOG(WARNING) << "failed to set fdb external client directory, dir: "
<< config::fdb_external_client_directory
<< ", err: " << fdb_get_error(err);
return 1;
}
LOG(INFO) << "set fdb external client directory: " << config::fdb_external_client_directory;
}
// ATTN: Network can be configured only once,
// even if fdb_stop_network() is called successfully
err = fdb_setup_network(); // Must be called only once before any
// other functions of C-API
if (err) {
LOG(WARNING) << "failed to setup fdb network, err: " << fdb_get_error(err);
return 1;
}
// Network complete callback is optional, and useful for some cases
// std::function<void()> network_complete_callback =
// []() { std::cout << __PRETTY_FUNCTION__ << std::endl; };
// err = fdb_add_network_thread_completion_hook(callback1,
// &network_complete_callback);
// std::cout << "fdb_add_network_thread_completion_hook error: "
// << fdb_get_error(err) << std::endl;
// if (err) std::exit(-1);
// Run network in a separate thread
network_thread_ = std::shared_ptr<std::thread>(
new std::thread([] {
// Will not return until fdb_stop_network() called
auto err = fdb_run_network();
LOG(WARNING) << "exit fdb_run_network"
<< (err ? std::string(", error: ") + fdb_get_error(err) : "");
}),
[](auto* p) {
auto err = fdb_stop_network();
LOG(WARNING) << "fdb_stop_network"
<< (err ? std::string(", error: ") + fdb_get_error(err) : "");
p->join();
delete p;
// Another network will work only after this thread exits
bool expected = true;
Network::working.compare_exchange_strong(expected, false);
});
pthread_setname_np(network_thread_->native_handle(), "fdb_network_thread");
return 0;
}
void Network::stop() {
network_thread_.reset();
}
// =============================================================================
// Impl of Database
// =============================================================================
int Database::init() {
// TODO: process opt
fdb_error_t err = fdb_create_database(cluster_file_path_.c_str(), &db_);
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__ << " fdb_create_database error: " << fdb_get_error(err)
<< " conf: " << cluster_file_path_;
return 1;
}
return 0;
}
// =============================================================================
// Impl of Transaction
// =============================================================================
TxnErrorCode Transaction::init() {
// TODO: process opt
fdb_error_t err = fdb_database_create_transaction(db_->db(), &txn_);
TEST_SYNC_POINT_CALLBACK("transaction:init:create_transaction_err", &err);
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__
<< " fdb_database_create_transaction error:" << fdb_get_error(err);
return cast_as_txn_code(err);
}
// FDB txn callback only guaranteed *at most once*, because the future might be set to `Never`
// by unexpected. In order to achieve *exactly once* semantic, a timeout must be set to force
// fdb cancel future and invoke callback eventually.
//
// See:
// - https://apple.github.io/foundationdb/api-c.html#fdb_future_set_callback.
// - https://forums.foundationdb.org/t/does-fdb-future-set-callback-guarantee-exactly-once-execution/1498/2
static_assert(sizeof(config::fdb_txn_timeout_ms) == sizeof(int64_t));
err = fdb_transaction_set_option(txn_, FDBTransactionOption::FDB_TR_OPTION_TIMEOUT,
(const uint8_t*)&config::fdb_txn_timeout_ms,
sizeof(config::fdb_txn_timeout_ms));
if (err) {
LOG_WARNING("fdb_transaction_set_option error: ")
.tag("code", err)
.tag("msg", fdb_get_error(err));
return cast_as_txn_code(err);
}
if (config::enable_logging_conflict_keys) {
err = fdb_transaction_set_option(
txn_, FDBTransactionOption::FDB_TR_OPTION_REPORT_CONFLICTING_KEYS, nullptr, 0);
if (err) {
LOG_WARNING("fdb_transaction_set_option error: ")
.tag("option", "FDB_TR_OPTION_REPORT_CONFLICTING_KEYS")
.tag("code", err)
.tag("msg", fdb_get_error(err));
return cast_as_txn_code(err);
}
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::enable_access_system_keys() {
fdb_error_t err = fdb_transaction_set_option(
txn_, FDBTransactionOption::FDB_TR_OPTION_ACCESS_SYSTEM_KEYS, nullptr, 0);
if (err) {
LOG_WARNING("fdb_transaction_set_option error: ")
.tag("option", "FDB_TR_OPTION_ACCESS_SYSTEM_KEYS")
.tag("code", err)
.tag("msg", fdb_get_error(err));
return cast_as_txn_code(err);
}
return TxnErrorCode::TXN_OK;
}
void Transaction::put(std::string_view key, std::string_view val) {
StopWatch sw;
fdb_transaction_set(txn_, (uint8_t*)key.data(), key.size(), (uint8_t*)val.data(), val.size());
g_bvar_txn_kv_put << sw.elapsed_us();
++num_put_keys_;
put_bytes_ += key.size() + val.size();
approximate_bytes_ += key.size() * 3 + val.size(); // See fdbclient/ReadYourWrites.actor.cpp
if (val.size() > FDB_VALUE_BYTES_LIMIT) {
LOG_WARNING("txn put with large value")
.tag("key", hex(key))
.tag("value", hex(val.substr(0, 64)) + "...")
.tag("value_size", val.size());
}
}
// return 0 for success otherwise error
static TxnErrorCode bthread_fdb_future_block_until_ready(FDBFuture* fut) {
bthread::CountdownEvent event;
static auto callback = [](FDBFuture*, void* event) {
((bthread::CountdownEvent*)event)->signal();
};
auto err = fdb_future_set_callback(fut, callback, &event);
if (err) [[unlikely]] {
LOG(WARNING) << "fdb_future_set_callback failed, err=" << fdb_get_error(err);
return cast_as_txn_code(err);
}
if (int ec = event.wait(); ec != 0) [[unlikely]] {
LOG(WARNING) << "CountdownEvent wait failed, err=" << std::strerror(ec);
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
return TxnErrorCode::TXN_OK;
}
// return TXN_OK for success otherwise error
static TxnErrorCode await_future(FDBFuture* fut) {
if (bthread_self() != 0) {
return bthread_fdb_future_block_until_ready(fut);
}
auto err = fdb_future_block_until_ready(fut);
if (err) [[unlikely]] {
LOG(WARNING) << "fdb_future_block_until_ready failed: " << fdb_get_error(err);
return cast_as_txn_code(err);
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get(std::string_view key, std::string* val, bool snapshot) {
may_logging_single_version_reading(key);
StopWatch sw;
approximate_bytes_ += key.size() * 2; // See fdbclient/ReadYourWrites.actor.cpp for details
auto* fut = fdb_transaction_get(txn_, (uint8_t*)key.data(), key.size(), snapshot);
g_bvar_txn_kv_get_count_normalized << 1;
auto release_fut = [fut, &sw](int*) {
fdb_future_destroy(fut);
g_bvar_txn_kv_get << sw.elapsed_us();
};
std::unique_ptr<int, decltype(release_fut)> defer((int*)0x01, std::move(release_fut));
RETURN_IF_ERROR(await_future(fut));
auto err = fdb_future_get_error(fut);
TEST_SYNC_POINT_CALLBACK("transaction:get:get_err", &err);
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__
<< " failed to fdb_future_get_error err=" << fdb_get_error(err)
<< " key=" << hex(key);
return cast_as_txn_code(err);
}
fdb_bool_t found;
const uint8_t* ret;
int len;
err = fdb_future_get_value(fut, &found, &ret, &len);
num_get_keys_++;
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__
<< " failed to fdb_future_get_value err=" << fdb_get_error(err)
<< " key=" << hex(key);
return cast_as_txn_code(err);
}
get_bytes_ += len + key.size();
if (!found) return TxnErrorCode::TXN_KEY_NOT_FOUND;
*val = std::string((char*)ret, len);
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get(std::string_view begin, std::string_view end,
std::unique_ptr<cloud::RangeGetIterator>* iter,
const RangeGetOptions& opts) {
may_logging_single_version_reading(begin);
StopWatch sw;
approximate_bytes_ += begin.size() + end.size();
DORIS_CLOUD_DEFER {
g_bvar_txn_kv_range_get << sw.elapsed_us();
};
int limit = opts.batch_limit;
fdb_bool_t snapshot = opts.snapshot ? 1 : 0;
fdb_bool_t reverse = opts.reverse ? 1 : 0;
auto [begin_or_equal, begin_offset] = apply_key_selector(opts.begin_key_selector);
auto [end_or_equal, end_offset] = apply_key_selector(opts.end_key_selector);
FDBFuture* fut = fdb_transaction_get_range(
txn_, (uint8_t*)begin.data(), begin.size(), begin_or_equal, begin_offset,
(uint8_t*)end.data(), end.size(), end_or_equal, end_offset, limit,
0 /*target_bytes, unlimited*/, FDBStreamingMode::FDB_STREAMING_MODE_WANT_ALL,
0 /*iteration*/, snapshot, reverse);
RETURN_IF_ERROR(await_future(fut));
auto err = fdb_future_get_error(fut);
TEST_SYNC_POINT_CALLBACK("transaction:get_range:get_err", &err);
if (err) {
LOG(WARNING) << fdb_get_error(err);
return cast_as_txn_code(err);
}
std::unique_ptr<RangeGetIterator> ret(new RangeGetIterator(fut));
RETURN_IF_ERROR(ret->init());
num_get_keys_ += ret->size();
get_bytes_ += ret->get_kv_bytes();
g_bvar_txn_kv_get_count_normalized << ret->size();
*(iter) = std::move(ret);
return TxnErrorCode::TXN_OK;
}
std::unique_ptr<cloud::FullRangeGetIterator> Transaction::full_range_get(std::string_view begin,
std::string_view end,
FullRangeGetOptions opts) {
// We don't need to hold a reference to the TxnKv here, since there is a txn full range iterator.
opts.txn = this;
opts.txn_kv.reset();
return std::make_unique<FullRangeGetIterator>(std::string(begin), std::string(end), opts);
}
void Transaction::atomic_set_ver_key(std::string_view key_prefix, std::string_view val) {
StopWatch sw;
std::string key(key_prefix);
int prefix_size = key.size();
// ATTN:
// 10 bytes for versiontimestamp must be 0, trailing 4 bytes is for prefix len
key.append(14, '\0');
std::memcpy(key.data() + (key.size() - 4), &prefix_size, 4);
fdb_transaction_atomic_op(txn_, (uint8_t*)key.data(), key.size(), (uint8_t*)val.data(),
val.size(),
FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY);
g_bvar_txn_kv_atomic_set_ver_key << sw.elapsed_us();
++num_put_keys_;
put_bytes_ += key.size() + val.size();
approximate_bytes_ += key.size() * 3 + val.size();
if (val.size() > FDB_VALUE_BYTES_LIMIT) {
LOG_WARNING("atomic_set_ver_key with large value")
.tag("key", hex(key_prefix))
.tag("value", hex(val.substr(0, 64)) + "...")
.tag("value_size", val.size());
}
}
bool Transaction::atomic_set_ver_key(std::string_view key, uint32_t offset, std::string_view val) {
if (key.size() < 10 || offset + 10 > key.size()) {
LOG(WARNING) << "atomic_set_ver_key: invalid key or offset, key=" << hex(key)
<< " offset=" << offset << ", key_size=" << key.size();
return false;
}
StopWatch sw;
std::string key_buf(key);
// 4 bytes for prefix len, assume in letter-endian
key_buf.append((const char*)&offset, 4);
fdb_transaction_atomic_op(txn_, (uint8_t*)key_buf.data(), key_buf.size(), (uint8_t*)val.data(),
val.size(),
FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY);
g_bvar_txn_kv_atomic_set_ver_key << sw.elapsed_us();
++num_put_keys_;
put_bytes_ += key_buf.size() + val.size();
approximate_bytes_ += key_buf.size() * 3 + val.size();
if (val.size() > FDB_VALUE_BYTES_LIMIT) {
LOG_WARNING("atomic_set_ver_key with large value")
.tag("key", hex(key))
.tag("value", hex(val.substr(0, 64)) + "...")
.tag("value_size", val.size());
}
return true;
}
void Transaction::atomic_set_ver_value(std::string_view key, std::string_view value) {
StopWatch sw;
std::string val(value);
int prefix_size = val.size();
// ATTN:
// 10 bytes for versiontimestamp must be 0, trailing 4 bytes is for prefix len
val.append(14, '\0');
std::memcpy(val.data() + (val.size() - 4), &prefix_size, 4);
fdb_transaction_atomic_op(txn_, (uint8_t*)key.data(), key.size(), (uint8_t*)val.data(),
val.size(),
FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_VALUE);
g_bvar_txn_kv_atomic_set_ver_value << sw.elapsed_us();
++num_put_keys_;
put_bytes_ += key.size() + val.size();
approximate_bytes_ += key.size() * 3 + val.size();
if (val.size() > FDB_VALUE_BYTES_LIMIT) {
LOG_WARNING("atomic_set_ver_value with large value")
.tag("key", hex(key))
.tag("value", hex(val.substr(0, 64)) + "...")
.tag("value_size", val.size());
}
}
void Transaction::atomic_add(std::string_view key, int64_t to_add) {
StopWatch sw;
auto val = std::make_unique<std::string>(sizeof(to_add), '\0');
std::memcpy(val->data(), &to_add, sizeof(to_add));
fdb_transaction_atomic_op(txn_, (uint8_t*)key.data(), key.size(), (uint8_t*)val->data(),
sizeof(to_add), FDBMutationType::FDB_MUTATION_TYPE_ADD);
g_bvar_txn_kv_atomic_add << sw.elapsed_us();
++num_put_keys_;
put_bytes_ += key.size() + 8;
approximate_bytes_ += key.size() * 3 + 8;
}
bool Transaction::decode_atomic_int(std::string_view data, int64_t* val) {
if (data.size() != sizeof(*val)) {
return false;
}
// ATTN: The FDB_MUTATION_TYPE_ADD stores integers in a little-endian representation.
std::memcpy(val, data.data(), sizeof(*val));
if constexpr (std::endian::native == std::endian::big) {
*val = bswap_64(*val);
}
return true;
}
void Transaction::remove(std::string_view key) {
StopWatch sw;
fdb_transaction_clear(txn_, (uint8_t*)key.data(), key.size());
g_bvar_txn_kv_remove << sw.elapsed_us();
++num_del_keys_;
delete_bytes_ += key.size();
approximate_bytes_ += key.size() * 4; // See fdbclient/ReadYourWrites.actor.cpp for details.
}
void Transaction::remove(std::string_view begin, std::string_view end) {
StopWatch sw;
fdb_transaction_clear_range(txn_, (uint8_t*)begin.data(), begin.size(), (uint8_t*)end.data(),
end.size());
g_bvar_txn_kv_range_remove << sw.elapsed_us();
num_del_keys_ += 2;
delete_bytes_ += begin.size() + end.size();
approximate_bytes_ +=
(begin.size() + end.size()) * 2; // See fdbclient/ReadYourWrites.actor.cpp for details.
}
TxnErrorCode Transaction::commit() {
fdb_error_t err = 0;
TEST_INJECTION_POINT_CALLBACK("Transaction::commit.inject_random_fault", &err);
TEST_SYNC_POINT_CALLBACK("transaction:commit:get_err", &err);
FDBFuture* versionstamp_fut = nullptr;
if (versionstamp_enabled_) {
versionstamp_fut = fdb_transaction_get_versionstamp(txn_);
}
DORIS_CLOUD_DEFER {
if (versionstamp_fut) {
fdb_future_destroy(versionstamp_fut);
}
};
if (err == 0) [[likely]] {
StopWatch sw;
auto* fut = fdb_transaction_commit(txn_);
auto release_fut = [fut, &sw](int*) {
fdb_future_destroy(fut);
g_bvar_txn_kv_commit << sw.elapsed_us();
};
std::unique_ptr<int, decltype(release_fut)> defer((int*)0x01, std::move(release_fut));
RETURN_IF_ERROR(await_future(fut));
err = fdb_future_get_error(fut);
}
if (err) {
LOG(WARNING) << "fdb commit error, code=" << err << " msg=" << fdb_get_error(err);
if (fdb_error_is_txn_conflict(err)) {
g_bvar_txn_kv_commit_conflict_counter << 1;
static_cast<void>(report_conflicting_range()); // don't overwrite the original error.
} else {
g_bvar_txn_kv_commit_error_counter << 1;
}
// If cluster_version_changed is thrown during commit, it should be interpreted similarly to
// commit_unknown_result. The commit may or may not have been completed.
if (err == FDB_ERROR_CODE_CLUSTER_VERSION_CHANGED) {
return TxnErrorCode::TXN_MAYBE_COMMITTED;
}
return cast_as_txn_code(err);
}
if (versionstamp_fut) {
RETURN_IF_ERROR(await_future(versionstamp_fut));
err = fdb_future_get_error(versionstamp_fut);
if (err) {
LOG(WARNING) << "get versionstamp error, code=" << err << " msg=" << fdb_get_error(err);
return cast_as_txn_code(err);
}
const uint8_t* versionstamp_data;
int versionstamp_length;
err = fdb_future_get_key(versionstamp_fut, &versionstamp_data, &versionstamp_length);
if (err) {
LOG(WARNING) << "get versionstamp key error, code=" << err
<< " msg=" << fdb_get_error(err);
return cast_as_txn_code(err);
}
if (versionstamp_length == 10) {
versionstamp_result_ = Versionstamp(versionstamp_data);
} else {
LOG(WARNING) << "unexpected versionstamp length: " << versionstamp_length;
}
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::watch_key(std::string_view key) {
StopWatch sw;
auto* fut = fdb_transaction_watch(txn_, (uint8_t*)key.data(), key.size());
DORIS_CLOUD_DEFER {
fdb_future_destroy(fut);
g_bvar_txn_kv_watch_key << sw.elapsed_us();
};
RETURN_IF_ERROR(commit());
RETURN_IF_ERROR(await_future(fut));
auto err = fdb_future_get_error(fut);
TEST_SYNC_POINT_CALLBACK("transaction:watch_key:get_err", &err);
if (err) {
if (err == FDB_ERROR_COMMIT_UNKNOWN_RESULT) {
fdb_future_cancel(fut);
}
LOG(WARNING) << "fdb watch key " << hex(key) << ": " << fdb_get_error(err);
return cast_as_txn_code(err);
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get_read_version(int64_t* version) {
StopWatch sw;
auto* fut = fdb_transaction_get_read_version(txn_);
DORIS_CLOUD_DEFER {
fdb_future_destroy(fut);
g_bvar_txn_kv_get_read_version << sw.elapsed_us();
};
RETURN_IF_ERROR(await_future(fut));
auto err = fdb_future_get_error(fut);
TEST_SYNC_POINT_CALLBACK("transaction:get_read_version:get_err", &err);
if (err) {
LOG(WARNING) << "get read version: " << fdb_get_error(err);
return cast_as_txn_code(err);
}
err = fdb_future_get_int64(fut, version);
if (err) {
LOG(WARNING) << "get read version: " << fdb_get_error(err);
return cast_as_txn_code(err);
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get_committed_version(int64_t* version) {
StopWatch sw;
auto err = fdb_transaction_get_committed_version(txn_, version);
if (err) {
LOG(WARNING) << "get committed version " << fdb_get_error(err);
g_bvar_txn_kv_get_committed_version << sw.elapsed_us();
return cast_as_txn_code(err);
}
g_bvar_txn_kv_get_committed_version << sw.elapsed_us();
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::abort() {
return TxnErrorCode::TXN_OK;
}
void Transaction::enable_get_versionstamp() {
versionstamp_enabled_ = true;
}
TxnErrorCode Transaction::get_versionstamp(Versionstamp* versionstamp) {
if (!versionstamp_enabled_) {
LOG(WARNING) << "get_versionstamp called but versionstamp not enabled";
return TxnErrorCode::TXN_INVALID_ARGUMENT;
}
if (versionstamp_result_ == Versionstamp()) {
LOG(WARNING) << "versionstamp not available, commit may not have been called or failed";
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
*versionstamp = versionstamp_result_;
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get_conflicting_range(
std::vector<std::pair<std::string, std::string>>* values) {
constexpr std::string_view start = "\xff\xff/transaction/conflicting_keys/";
constexpr std::string_view end = "\xff\xff/transaction/conflicting_keys/\xff";
int limit = 0;
int target_bytes = 0;
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
int iteration = 0;
fdb_bool_t snapshot = 0;
fdb_bool_t reverse = 0;
FDBFuture* future = fdb_transaction_get_range(
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
target_bytes, mode, iteration, snapshot, reverse);
DORIS_CLOUD_DEFER {
fdb_future_destroy(future);
};
RETURN_IF_ERROR(await_future(future));
FDBKeyValue const* out_kvs;
int out_kvs_count;
fdb_bool_t out_more;
do {
fdb_error_t err =
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
if (err) {
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
<< fdb_get_error(err);
return cast_as_txn_code(err);
}
for (int i = 0; i < out_kvs_count; i++) {
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
key.remove_prefix(start.size());
values->emplace_back(key, value);
}
} while (out_more);
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::report_conflicting_range() {
if (!config::enable_logging_conflict_keys) {
return TxnErrorCode::TXN_OK;
}
std::vector<std::pair<std::string, std::string>> key_values;
RETURN_IF_ERROR(get_conflicting_range(&key_values));
// See https://github.com/apple/foundationdb/pull/2257/files for detail.
if (key_values.size() % 2 != 0) {
LOG(WARNING) << "the conflicting range is not well-formed, size=" << key_values.size();
return TxnErrorCode::TXN_INVALID_DATA;
}
std::string out;
for (size_t i = 0; i < key_values.size(); i += 2) {
std::string_view start = key_values[i].first;
std::string_view end = key_values[i + 1].first;
std::string_view conflict_count = key_values[i].second;
if (!out.empty()) {
out += ", ";
}
out += fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
}
LOG(WARNING) << "conflicting key ranges: " << out;
return TxnErrorCode::TXN_OK;
}
TxnErrorCode RangeGetIterator::init() {
if (fut_ == nullptr) return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
idx_ = 0;
kvs_size_ = 0;
more_ = false;
kvs_ = nullptr;
auto err = fdb_future_get_keyvalue_array(fut_, &kvs_, &kvs_size_, &more_);
TEST_SYNC_POINT_CALLBACK("range_get_iterator:init:get_keyvalue_array_err", &err);
if (err) {
LOG(WARNING) << "fdb_future_get_keyvalue_array failed, err=" << fdb_get_error(err);
return cast_as_txn_code(err);
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::batch_get(std::vector<std::optional<std::string>>* res,
const std::vector<std::string>& keys,
const BatchGetOptions& opts) {
struct FDBFutureDelete {
void operator()(FDBFuture* future) { fdb_future_destroy(future); }
};
res->clear();
if (keys.empty()) {
return TxnErrorCode::TXN_OK;
}
StopWatch sw;
auto stop_watcher = [&sw](int*) { g_bvar_txn_kv_batch_get << sw.elapsed_us(); };
std::unique_ptr<int, decltype(stop_watcher)> defer((int*)0x01, std::move(stop_watcher));
size_t num_keys = keys.size();
res->reserve(keys.size());
g_bvar_txn_kv_get_count_normalized << keys.size();
std::vector<std::unique_ptr<FDBFuture, FDBFutureDelete>> futures;
futures.reserve(opts.concurrency);
for (size_t i = 0; i < num_keys; i += opts.concurrency) {
size_t size = std::min(i + opts.concurrency, num_keys);
for (size_t j = i; j < size; j++) {
const auto& k = keys[j];
may_logging_single_version_reading(k);
futures.emplace_back(
fdb_transaction_get(txn_, (uint8_t*)k.data(), k.size(), opts.snapshot));
approximate_bytes_ += k.size() * 2;
}
size_t num_futures = futures.size();
for (auto j = 0; j < num_futures; j++) {
FDBFuture* future = futures[j].get();
std::string_view key = keys[i + j];
RETURN_IF_ERROR(await_future(future));
fdb_error_t err = fdb_future_get_error(future);
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__
<< " failed to fdb_future_get_error err=" << fdb_get_error(err)
<< " key=" << hex(key);
return cast_as_txn_code(err);
}
fdb_bool_t found;
const uint8_t* ret;
int len;
err = fdb_future_get_value(future, &found, &ret, &len);
num_get_keys_++;
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__
<< " failed to fdb_future_get_value err=" << fdb_get_error(err)
<< " key=" << hex(key);
return cast_as_txn_code(err);
}
if (!found) {
res->push_back(std::nullopt);
continue;
}
get_bytes_ += len + key.size();
res->push_back(std::string((char*)ret, len));
}
futures.clear();
}
DCHECK_EQ(res->size(), num_keys);
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::batch_scan(
std::vector<std::optional<std::pair<std::string, std::string>>>* res,
const std::vector<std::pair<std::string, std::string>>& ranges,
const BatchGetOptions& opts) {
struct FDBFutureDelete {
void operator()(FDBFuture* future) { fdb_future_destroy(future); }
};
res->clear();
if (ranges.empty()) {
return TxnErrorCode::TXN_OK;
}
StopWatch sw;
auto stop_watcher = [&sw](int*) { g_bvar_txn_kv_range_get << sw.elapsed_us(); };
std::unique_ptr<int, decltype(stop_watcher)> defer((int*)0x01, std::move(stop_watcher));
size_t num_ranges = ranges.size();
res->reserve(ranges.size());
g_bvar_txn_kv_get_count_normalized << num_ranges;
std::vector<std::unique_ptr<FDBFuture, FDBFutureDelete>> futures;
futures.reserve(opts.concurrency);
fdb_bool_t snapshot = opts.snapshot ? 1 : 0;
fdb_bool_t reverse = opts.reverse ? 1 : 0;
for (size_t i = 0; i < num_ranges; i += opts.concurrency) {
size_t batch_size = std::min(i + opts.concurrency, num_ranges);
for (size_t j = i; j < batch_size; j++) {
auto&& [start, end] = ranges[j];
FDBFuture* fut = fdb_transaction_get_range(
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), end.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()),
1, // limit: take the first one
0, // target_bytes, unlimited
FDBStreamingMode::FDB_STREAMING_MODE_WANT_ALL,
0, // iteration
snapshot, reverse);
futures.emplace_back(fut);
approximate_bytes_ += start.size() + end.size();
}
size_t num_futures = futures.size();
for (size_t j = 0; j < num_futures; j++) {
FDBFuture* future = futures[j].get();
auto&& [start, end] = ranges[i + j];
RETURN_IF_ERROR(await_future(future));
fdb_error_t err = fdb_future_get_error(future);
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__
<< " failed to fdb_future_get_error err=" << fdb_get_error(err)
<< " start=" << hex(start) << " end=" << hex(end);
return cast_as_txn_code(err);
}
const FDBKeyValue* kvs;
int kvs_size;
fdb_bool_t more;
err = fdb_future_get_keyvalue_array(future, &kvs, &kvs_size, &more);
num_get_keys_++;
if (err) {
LOG(WARNING) << __PRETTY_FUNCTION__
<< " failed to fdb_future_get_keyvalue_array err="
<< fdb_get_error(err) << " start=" << hex(start)
<< " end=" << hex(end);
return cast_as_txn_code(err);
}
if (kvs_size == 0) {
res->push_back(std::nullopt);
} else {
const FDBKeyValue& kv = kvs[0];
get_bytes_ += kv.value_length + kv.key_length;
std::string output_key((char*)kv.key, kv.key_length);
std::string output_value((char*)kv.value, kv.value_length);
res->emplace_back(std::make_pair(std::move(output_key), std::move(output_value)));
}
}
futures.clear();
}
DCHECK_EQ(res->size(), num_ranges);
return TxnErrorCode::TXN_OK;
}
FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
FullRangeGetOptions opts)
: opts_(std::move(opts)), begin_(std::move(begin)), end_(std::move(end)) {
if (opts_.txn_kv) {
DCHECK(dynamic_cast<FdbTxnKv*>(opts_.txn_kv.get()));
}
DCHECK(!opts_.txn || dynamic_cast<fdb::Transaction*>(opts_.txn)) << opts_.txn;
}
FullRangeGetIterator::~FullRangeGetIterator() {
if (fut_) {
static_cast<void>(fdb::await_future(fut_));
fdb_future_destroy(fut_);
}
}
bool FullRangeGetIterator::has_next() {
if (!is_valid()) {
return false;
}
if (opts_.exact_limit > 0 && num_consumed_ >= opts_.exact_limit) {
return false;
}
if (!inner_iter_) {
// The first call
init();
if (!is_valid()) {
return false;
}
return inner_iter_->has_next();
}
if (inner_iter_->has_next()) {
if (prefetch()) {
TEST_SYNC_POINT("fdb.FullRangeGetIterator.has_next_prefetch");
async_get_next_batch();
}
return true;
}
if (!inner_iter_->more()) {
return false;
}
if (!fut_) {
async_get_next_batch();
if (!is_valid()) {
return false;
}
}
await_future();
return is_valid() ? inner_iter_->has_next() : false;
}
std::optional<std::pair<std::string_view, std::string_view>> FullRangeGetIterator::next() {
if (!has_next()) {
return std::nullopt;
}
num_consumed_++;
return inner_iter_->next();
}
std::optional<std::pair<std::string_view, std::string_view>> FullRangeGetIterator::peek() {
if (!has_next()) {
return std::nullopt;
}
return inner_iter_->peek();
}
void FullRangeGetIterator::await_future() {
auto ret = fdb::await_future(fut_);
if (ret != TxnErrorCode::TXN_OK) {
code_ = ret;
return;
}
auto err = fdb_future_get_error(fut_);
if (err) {
code_ = cast_as_txn_code(err);
LOG(WARNING) << fdb_get_error(err);
return;
}
if (opts_.obj_pool && inner_iter_) {
opts_.obj_pool->push_back(std::move(inner_iter_));
}
inner_iter_ = std::make_unique<RangeGetIterator>(fut_);
fut_ = nullptr;
code_ = inner_iter_->init();
}
void FullRangeGetIterator::init() {
async_inner_get(begin_, end_);
if (!is_valid()) {
return;
}
await_future();
}
bool FullRangeGetIterator::prefetch() {
return opts_.prefetch && is_valid() && !fut_ && inner_iter_->more() &&
(opts_.exact_limit <= 0 || num_consumed_ + inner_iter_->remaining() < opts_.exact_limit);
}
void FullRangeGetIterator::async_inner_get(std::string_view begin, std::string_view end) {
DCHECK(!fut_);
auto* txn = static_cast<Transaction*>(opts_.txn);
if (!txn) {
// Create a new txn for each inner range get
std::unique_ptr<cloud::Transaction> txn1;
// TODO(plat1ko): Async create txn
TxnErrorCode err = opts_.txn_kv->create_txn(&txn1);
if (err != TxnErrorCode::TXN_OK) {
code_ = err;
return;
}
txn_.reset(static_cast<Transaction*>(txn1.release()));
txn = txn_.get();
}
// TODO(plat1ko): Support `Transaction::async_get` api
int limit = std::max(opts_.batch_limit, 0);
if (opts_.exact_limit > 0) {
// If we have consumed some keys, we need to adjust the remaining limit.
int consumed = num_consumed_ + (inner_iter_ ? inner_iter_->remaining() : 0);
limit = std::min(limit, std::max(0, opts_.exact_limit - consumed));
}
fdb_bool_t snapshot = opts_.snapshot ? 1 : 0;
fdb_bool_t reverse = opts_.reverse ? 1 : 0;
auto [begin_or_equal, begin_offset] = apply_key_selector(opts_.begin_key_selector);
auto [end_or_equal, end_offset] = apply_key_selector(opts_.end_key_selector);
fut_ = fdb_transaction_get_range(txn->txn_, (uint8_t*)begin.data(), begin.size(),
begin_or_equal, begin_offset, (uint8_t*)end.data(), end.size(),
end_or_equal, end_offset, limit, 0 /*target_bytes, unlimited*/,
FDBStreamingMode::FDB_STREAMING_MODE_WANT_ALL, 0 /*iteration*/,
snapshot, reverse);
}
void FullRangeGetIterator::async_get_next_batch() {
if (opts_.reverse) {
// Change the end key to the previous last key. The key selector will be
// FIRST_GREATER_OR_EQUAL, so we need to use the last key of the inner iterator as the
// end key, since the end key is exclusive.
opts_.end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL;
std::string_view end_key = inner_iter_->last_key();
async_inner_get(begin_, end_key);
} else {
opts_.begin_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL;
std::string begin_key = inner_iter_->next_begin_key();
async_inner_get(begin_key, end_);
}
}
} // namespace doris::cloud::fdb