blob: 81755043ea16a025a87618f1a9adda2c8ce83c44 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "mem_txn_kv.h"
#include <glog/logging.h>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <optional>
#include <ostream>
#include <ranges>
#include <string>
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-store/txn_kv_error.h"
#include "txn_kv.h"
namespace doris::cloud {
int MemTxnKv::init() {
return 0;
}
TxnErrorCode MemTxnKv::create_txn(std::unique_ptr<Transaction>* txn) {
auto* t = new memkv::Transaction(this->shared_from_this());
txn->reset(t);
return TxnErrorCode::TXN_OK;
}
TxnErrorCode MemTxnKv::get_kv(const std::string& key, std::string* val, int64_t version) {
std::lock_guard<std::mutex> l(lock_);
auto it = mem_kv_.find(key);
if (it == mem_kv_.end() || it->second.empty()) {
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
for (auto&& entry : it->second) {
if (entry.commit_version <= version) {
if (!entry.value.has_value()) {
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
*val = *entry.value;
return TxnErrorCode::TXN_OK;
}
}
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
TxnErrorCode MemTxnKv::get_kv(const std::string& begin, const std::string& end, int64_t version,
const RangeGetOptions& opts, bool* more,
std::vector<std::pair<std::string, std::string>>* kv_list) {
if (begin >= end) {
return TxnErrorCode::TXN_OK;
}
bool use_limit = true;
int limit = opts.batch_limit;
if (limit < 0) {
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
if (limit == 0) {
use_limit = false;
}
std::unique_lock<std::mutex> l(lock_);
auto apply_key_selector = [&](RangeKeySelector selector,
const std::string& key) -> decltype(mem_kv_.lower_bound(key)) {
auto iter = mem_kv_.lower_bound(key);
switch (selector) {
case RangeKeySelector::FIRST_GREATER_OR_EQUAL:
break;
case RangeKeySelector::FIRST_GREATER_THAN:
if (iter != mem_kv_.end() && iter->first == key) {
++iter;
}
break;
case RangeKeySelector::LAST_LESS_OR_EQUAL:
if (iter != mem_kv_.begin() && iter->first != key) {
--iter;
}
break;
case RangeKeySelector::LAST_LESS_THAN:
if (iter != mem_kv_.begin()) {
--iter;
}
break;
}
return iter;
};
*more = false;
bool reverse = opts.reverse;
std::vector<std::pair<std::string, std::string>> temp_results;
if (!reverse) {
// Forward iteration
auto begin_iter = apply_key_selector(opts.begin_key_selector, begin);
auto end_iter = apply_key_selector(opts.end_key_selector, end);
if (begin_iter == mem_kv_.end() ||
(end_iter != mem_kv_.end() && end_iter->first < begin_iter->first)) {
// If the begin iterator is at the end or the end iterator is before begin, return empty
kv_list->clear();
*more = false;
return TxnErrorCode::TXN_OK;
}
for (; begin_iter != end_iter; begin_iter++) {
// Find the appropriate version
for (auto&& entry : begin_iter->second) {
if (entry.commit_version > version) {
continue;
}
if (!entry.value.has_value()) {
break;
}
temp_results.emplace_back(begin_iter->first, *entry.value);
break;
}
if (use_limit && temp_results.size() >= static_cast<size_t>(limit)) {
*more = true;
break;
}
}
} else {
// Reverse iteration
auto end_iter = apply_key_selector(opts.end_key_selector, end);
auto begin_iter = apply_key_selector(opts.begin_key_selector, begin);
if (begin_iter == mem_kv_.end() ||
(end_iter != mem_kv_.end() && end_iter->first <= begin_iter->first)) {
kv_list->clear();
*more = false;
return TxnErrorCode::TXN_OK;
}
do {
--end_iter; // end always excludes the last key
for (auto&& entry : end_iter->second) {
if (entry.commit_version > version) {
continue;
}
if (!entry.value.has_value()) {
break;
}
temp_results.emplace_back(end_iter->first, *entry.value);
break;
}
if (use_limit && temp_results.size() >= static_cast<size_t>(limit)) {
*more = true;
break;
}
} while (end_iter != begin_iter);
}
kv_list->swap(temp_results);
return TxnErrorCode::TXN_OK;
}
TxnErrorCode MemTxnKv::update(const std::set<std::string>& read_set,
const std::vector<OpTuple>& op_list, int64_t read_version,
int64_t* committed_version) {
std::lock_guard<std::mutex> l(lock_);
// check_conflict
for (const auto& k : read_set) {
auto iter = log_kv_.find(k);
if (iter != log_kv_.end()) {
auto log_item = iter->second;
if (log_item.front().commit_version_ > read_version) {
LOG(WARNING) << "commit conflict, key: " << k
<< ", log_version: " << log_item.front().commit_version_
<< ", read_version: " << read_version;
//keep the same behaviour with fdb.
return TxnErrorCode::TXN_CONFLICT;
}
}
}
++committed_version_;
int16_t seq = 0;
std::set<std::string> modified_keys; // Track which keys were modified
for (const auto& vec : op_list) {
const auto& [op_type, k, v] = vec;
LogItem log_item {op_type, committed_version_, k, v};
log_kv_[k].push_front(log_item);
switch (op_type) {
case memkv::ModifyOpType::PUT: {
mem_kv_[k].push_front(Version {committed_version_, v});
modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::ATOMIC_SET_VER_KEY: {
std::string ver_key(k);
gen_version_timestamp(committed_version_, seq, &ver_key);
mem_kv_[ver_key].push_front(Version {committed_version_, v});
modified_keys.insert(ver_key);
break;
}
case memkv::ModifyOpType::ATOMIC_SET_VER_VAL: {
std::string ver_val(v);
gen_version_timestamp(committed_version_, seq, &ver_val);
mem_kv_[k].push_front(Version {committed_version_, ver_val});
modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::ATOMIC_ADD: {
std::string org_val;
if (!mem_kv_[k].empty()) {
org_val = mem_kv_[k].front().value.value_or("");
}
if (org_val.size() != 8) {
org_val.resize(8, '\0');
}
int64_t res = *(int64_t*)org_val.data() + *(int64_t*)v.data();
std::memcpy(org_val.data(), &res, 8);
mem_kv_[k].push_front(Version {committed_version_, org_val});
modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::REMOVE: {
mem_kv_[k].push_front(Version {committed_version_, std::nullopt});
modified_keys.insert(k);
break;
}
case memkv::ModifyOpType::REMOVE_RANGE: {
auto begin_iter = mem_kv_.lower_bound(k);
auto end_iter = mem_kv_.lower_bound(v);
while (begin_iter != end_iter) {
mem_kv_[begin_iter->first].push_front(Version {committed_version_, std::nullopt});
modified_keys.insert(begin_iter->first);
begin_iter++;
}
break;
}
default:
break;
}
}
// Trigger watches for all modified keys
for (const auto& key : modified_keys) {
trigger_watches(key);
}
*committed_version = committed_version_;
return TxnErrorCode::TXN_OK;
}
int MemTxnKv::gen_version_timestamp(int64_t ver, int16_t seq, std::string* str) {
// Convert litter endian to big endian
static auto to_big_int64 = [](int64_t v) {
v = ((v & 0xffffffff00000000) >> 32) | ((v & 0x00000000ffffffff) << 32);
v = ((v & 0xffff0000ffff0000) >> 16) | ((v & 0x0000ffff0000ffff) << 16);
v = ((v & 0xff00ff00ff00ff00) >> 8) | ((v & 0x00ff00ff00ff00ff) << 8);
return v;
};
static auto to_big_int16 = [](int16_t v) {
v = ((v & 0xff00) >> 8) | ((v & 0x00ff) << 8);
return v;
};
ver = to_big_int64(ver);
seq = to_big_int16(seq);
size_t size = str->size();
if (size < 14) {
LOG(WARNING) << "gen_version_timestamp: str size is too small, size: " << size
<< ", required: 14";
return -1;
}
uint32_t offset = 0;
std::memcpy(&offset, str->data() + size - 4, sizeof(offset));
str->resize(size - 4, '\0');
if (offset + 10 > str->size()) {
LOG(WARNING) << "gen_version_timestamp: offset + 10 > str size, offset: " << offset
<< ", str size: " << size;
return -1;
}
std::memcpy(str->data() + offset, &ver, sizeof(ver));
std::memcpy(str->data() + offset + 8, &seq, sizeof(seq));
return 0;
}
int64_t MemTxnKv::get_last_commited_version() {
std::lock_guard<std::mutex> l(lock_);
return committed_version_;
}
int64_t MemTxnKv::get_last_read_version() {
std::lock_guard<std::mutex> l(lock_);
read_version_ = committed_version_;
return read_version_;
}
std::unique_ptr<FullRangeGetIterator> MemTxnKv::full_range_get(std::string begin, std::string end,
FullRangeGetOptions opts) {
return std::make_unique<memkv::FullRangeGetIterator>(std::move(begin), std::move(end),
std::move(opts));
}
void MemTxnKv::register_watch(const std::string& key, std::shared_ptr<WatchInfo> watch_info) {
std::lock_guard<std::mutex> l(lock_);
watches_[key].push_back(std::move(watch_info));
}
void MemTxnKv::trigger_watches(const std::string& key) {
// Must be called with lock_ held
auto it = watches_.find(key);
if (it == watches_.end()) {
return;
}
// Get the current version for this key
int64_t current_version = -1;
auto kv_it = mem_kv_.find(key);
if (kv_it != mem_kv_.end() && !kv_it->second.empty()) {
current_version = kv_it->second.front().commit_version;
}
// Trigger and remove watches where the version has changed
std::vector<std::shared_ptr<WatchInfo>> to_trigger;
auto& watch_list = it->second;
for (auto iter = watch_list.begin(); iter != watch_list.end();) {
auto& watch = *iter;
// Trigger if the current version is greater than the watch version
// This means the key has been modified since the watch was set
if (current_version > watch->watch_version) {
to_trigger.push_back(watch);
iter = watch_list.erase(iter);
} else {
++iter;
}
}
if (watch_list.empty()) {
watches_.erase(it);
}
for (auto& watch : to_trigger) {
std::lock_guard<std::mutex> watch_lock(watch->mutex);
watch->triggered = true;
watch->cv.notify_all();
}
}
} // namespace doris::cloud
namespace doris::cloud::memkv {
// =============================================================================
// Impl of Transaction
// =============================================================================
Transaction::Transaction(std::shared_ptr<MemTxnKv> kv) : kv_(std::move(kv)) {
std::lock_guard<std::mutex> l(lock_);
read_version_ = kv_->committed_version_;
}
int Transaction::init() {
return 0;
}
void Transaction::put(std::string_view key, std::string_view val) {
std::lock_guard<std::mutex> l(lock_);
std::string k(key.data(), key.size());
std::string v(val.data(), val.size());
writes_.insert_or_assign(k, v);
op_list_.emplace_back(ModifyOpType::PUT, k, v);
++num_put_keys_;
kv_->put_count_++;
kv_->put_bytes_ += key.size() + val.size();
put_bytes_ += key.size() + val.size();
approximate_bytes_ += key.size() + val.size();
}
TxnErrorCode Transaction::get(std::string_view key, std::string* val, bool snapshot) {
std::lock_guard<std::mutex> l(lock_);
std::string k(key.data(), key.size());
// the key set by atomic_xxx can't not be read before the txn is committed.
// if it is read, the txn will not be able to commit.
if (unreadable_keys_.count(k) != 0) {
aborted_ = true;
LOG(WARNING) << "read unreadable key, abort";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
return inner_get(k, val, snapshot);
}
TxnErrorCode Transaction::get(std::string_view begin, std::string_view end,
std::unique_ptr<cloud::RangeGetIterator>* iter,
const RangeGetOptions& opts) {
RangeGetOptions options = opts;
TEST_SYNC_POINT_CALLBACK("memkv::Transaction::get", &options.batch_limit);
std::lock_guard<std::mutex> l(lock_);
std::string begin_k(begin.data(), begin.size());
std::string end_k(end.data(), end.size());
// TODO: figure out what happen if range_get has part of unreadable_keys
if (unreadable_keys_.count(begin_k) != 0) {
aborted_ = true;
LOG(WARNING) << "read unreadable key, abort";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
return inner_get(begin_k, end_k, iter, options);
}
std::unique_ptr<cloud::FullRangeGetIterator> Transaction::full_range_get(
std::string_view begin, std::string_view end, cloud::FullRangeGetOptions opts) {
opts.txn = this;
opts.txn_kv.reset();
return kv_->full_range_get(std::string(begin), std::string(end), std::move(opts));
}
TxnErrorCode Transaction::inner_get(const std::string& key, std::string* val, bool snapshot) {
num_get_keys_++;
kv_->get_count_++;
// Read your writes.
auto it = writes_.find(key);
if (it != writes_.end()) {
*val = it->second;
return TxnErrorCode::TXN_OK;
}
if (!snapshot) read_set_.emplace(key);
TxnErrorCode err = kv_->get_kv(key, val, read_version_);
if (err != TxnErrorCode::TXN_OK) {
return err;
}
for (auto&& [start, end] : remove_ranges_) {
if (start <= key && key < end) {
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
}
get_bytes_ += val->size() + key.size();
kv_->get_bytes_ += val->size() + key.size();
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::inner_get(const std::string& begin, const std::string& end,
std::unique_ptr<cloud::RangeGetIterator>* iter,
const RangeGetOptions& opts) {
bool more = false;
bool snapshot = opts.snapshot;
std::vector<std::pair<std::string, std::string>> kv_list;
TxnErrorCode err = kv_->get_kv(begin, end, read_version_, opts, &more, &kv_list);
if (err != TxnErrorCode::TXN_OK) {
return err;
}
// Overwrite by your writes.
auto pred = [&](const std::pair<std::string, std::string>& val) {
for (auto&& [start, end] : remove_ranges_) {
if (start <= val.first && val.first < end) {
return true;
}
}
return false;
};
for (auto it = kv_list.begin(), last = kv_list.end(); it != last;) {
if (pred(*it)) {
it = kv_list.erase(it);
} else {
++it;
}
}
if (!snapshot) {
for (auto&& [key, _] : kv_list) {
read_set_.insert(key);
}
}
std::map<std::string, std::string> kv_map;
for (const auto& [key, value] : kv_list) {
kv_map[key] = value;
}
// Get writes in the range and apply key selectors
auto apply_key_selector = [&](RangeKeySelector selector,
const std::string& key) -> decltype(writes_.lower_bound(key)) {
auto iter = writes_.lower_bound(key);
switch (selector) {
case RangeKeySelector::FIRST_GREATER_OR_EQUAL:
break;
case RangeKeySelector::FIRST_GREATER_THAN:
if (iter != writes_.end() && iter->first == key) {
++iter;
}
break;
case RangeKeySelector::LAST_LESS_OR_EQUAL:
if (iter != writes_.begin() && iter->first != key) {
--iter;
}
break;
case RangeKeySelector::LAST_LESS_THAN:
if (iter != writes_.begin()) {
--iter;
}
break;
}
return iter;
};
auto begin_iter = apply_key_selector(opts.begin_key_selector, begin);
auto end_iter = apply_key_selector(opts.end_key_selector, end);
// The end_iter is exclusive, so we need to check if it is valid:
// 1. end_iter is in the end
// 2. or the begin_iter is less than the end_iter
for (; begin_iter != end_iter &&
(end_iter == writes_.end() || begin_iter->first < end_iter->first);
++begin_iter) {
const auto& key = begin_iter->first;
const auto& value = begin_iter->second;
kv_map[key] = value;
}
kv_list.clear();
if (!opts.reverse) {
for (const auto& [key, value] : kv_map) {
kv_list.emplace_back(key, value);
}
} else {
for (auto& it : std::ranges::reverse_view(kv_map)) {
kv_list.emplace_back(it.first, it.second);
}
}
if (opts.batch_limit > 0 && kv_list.size() > static_cast<size_t>(opts.batch_limit)) {
more = true;
kv_list.resize(opts.batch_limit);
}
num_get_keys_ += kv_list.size();
kv_->get_count_ += kv_list.size();
for (auto& [k, v] : kv_list) {
get_bytes_ += k.size() + v.size();
kv_->get_bytes_ += k.size() + v.size();
}
*iter = std::make_unique<memkv::RangeGetIterator>(std::move(kv_list), more);
return TxnErrorCode::TXN_OK;
}
void Transaction::atomic_set_ver_key(std::string_view key_prefix, std::string_view val) {
std::lock_guard<std::mutex> l(lock_);
kv_->put_count_++;
std::string k(key_prefix.data(), key_prefix.size());
std::string v(val.data(), val.size());
uint32_t prefix_size = k.size();
// ATTN:
// 10 bytes for versiontimestamp must be 0, trailing 4 bytes is for
// prefix len
k.resize(k.size() + 14, '\0');
std::memcpy(k.data() + (k.size() - 4), &prefix_size, 4);
unreadable_keys_.insert(k);
op_list_.emplace_back(ModifyOpType::ATOMIC_SET_VER_KEY, k, v);
++num_put_keys_;
kv_->put_bytes_ += k.size() + val.size();
put_bytes_ += k.size() + val.size();
approximate_bytes_ += k.size() + val.size();
}
bool Transaction::atomic_set_ver_key(std::string_view key, uint32_t offset, std::string_view val) {
if (key.size() < 10 || offset + 10 > key.size()) {
LOG(WARNING) << "atomic_set_ver_key: invalid key or offset, key=" << key
<< " offset=" << offset << ", key_size=" << key.size();
return false;
}
std::lock_guard<std::mutex> l(lock_);
kv_->put_count_++;
std::string k(key.data(), key.size());
std::string v(val.data(), val.size());
k.append((char*)&offset, sizeof(offset)); // ATTN: assume little-endian
unreadable_keys_.insert(k);
op_list_.emplace_back(ModifyOpType::ATOMIC_SET_VER_KEY, k, v);
++num_put_keys_;
put_bytes_ += k.size() + v.size();
approximate_bytes_ += k.size() + v.size();
return true;
}
void Transaction::atomic_set_ver_value(std::string_view key, std::string_view value) {
std::lock_guard<std::mutex> l(lock_);
kv_->put_count_++;
std::string k(key.data(), key.size());
std::string v(value.data(), value.size());
size_t prefix_size = v.size();
// ATTN:
// 10 bytes for versiontimestamp must be 0, trailing 4 bytes is for
// prefix len
v.resize(v.size() + 14, '\0');
std::memcpy(v.data() + (v.size() - 4), &prefix_size, 4);
unreadable_keys_.insert(k);
op_list_.emplace_back(ModifyOpType::ATOMIC_SET_VER_VAL, k, v);
++num_put_keys_;
kv_->put_bytes_ += key.size() + value.size();
put_bytes_ += key.size() + value.size();
approximate_bytes_ += key.size() + value.size();
}
void Transaction::atomic_add(std::string_view key, int64_t to_add) {
std::string k(key.data(), key.size());
std::string v(sizeof(to_add), '\0');
memcpy(v.data(), &to_add, sizeof(to_add));
std::lock_guard<std::mutex> l(lock_);
kv_->put_count_++;
op_list_.emplace_back(ModifyOpType::ATOMIC_ADD, std::move(k), std::move(v));
++num_put_keys_;
put_bytes_ += key.size() + 8;
kv_->put_bytes_ += key.size() + 8;
approximate_bytes_ += key.size() + 8;
}
bool Transaction::decode_atomic_int(std::string_view data, int64_t* val) {
if (data.size() != sizeof(int64_t)) {
return false;
}
memcpy(val, data.data(), sizeof(*val));
return true;
}
void Transaction::remove(std::string_view key) {
std::lock_guard<std::mutex> l(lock_);
kv_->del_count_++;
std::string k(key.data(), key.size());
writes_.erase(k);
std::string end_key = k;
end_key.push_back(0x0);
remove_ranges_.emplace_back(k, end_key);
op_list_.emplace_back(ModifyOpType::REMOVE, k, "");
++num_del_keys_;
kv_->del_bytes_ += key.size();
delete_bytes_ += key.size();
approximate_bytes_ += key.size();
}
void Transaction::remove(std::string_view begin, std::string_view end) {
std::lock_guard<std::mutex> l(lock_);
std::string begin_k(begin.data(), begin.size());
std::string end_k(end.data(), end.size());
if (begin_k >= end_k) {
LOG(WARNING) << "invalid remove range: [" << hex(begin_k) << ", " << hex(end_k) << ")";
aborted_ = true;
} else {
// ATTN: we do not support read your writes about delete range.
auto begin_iter = writes_.lower_bound(begin_k);
auto end_iter = writes_.lower_bound(end_k);
writes_.erase(begin_iter, end_iter);
remove_ranges_.emplace_back(begin_k, end_k);
op_list_.emplace_back(ModifyOpType::REMOVE_RANGE, begin_k, end_k);
}
kv_->del_count_ += 2;
// same as normal txn
num_del_keys_ += 2;
kv_->del_bytes_ += begin.size() + end.size();
delete_bytes_ += begin.size() + end.size();
approximate_bytes_ += begin.size() + end.size();
}
TxnErrorCode Transaction::commit() {
std::lock_guard<std::mutex> l(lock_);
if (aborted_) {
LOG(WARNING) << "transaction aborted, cannot commit";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
auto code = kv_->update(read_set_, op_list_, read_version_, &committed_version_);
if (code != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "transaction commit failed, code=" << static_cast<int>(code);
return code;
}
commited_ = true;
// Generate versionstamp if enabled
if (versionstamp_enabled_) {
// For MemTxnKv, generate a fake versionstamp based on committed_version_
// In real FDB, this would be the actual 10-byte versionstamp
versionstamp_result_ = Versionstamp(static_cast<uint64_t>(committed_version_), 0);
}
op_list_.clear();
read_set_.clear();
writes_.clear();
remove_ranges_.clear();
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get_read_version(int64_t* version) {
std::lock_guard<std::mutex> l(lock_);
*version = read_version_;
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get_committed_version(int64_t* version) {
std::lock_guard<std::mutex> l(lock_);
if (!commited_) {
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
*version = committed_version_;
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::watch_key(std::string_view key) {
std::string k(key.data(), key.size());
// Commit the transaction
auto commit_code = commit();
if (commit_code != TxnErrorCode::TXN_OK) {
return commit_code;
}
int64_t watch_version = read_version_;
auto watch_info = std::make_shared<MemTxnKv::WatchInfo>();
watch_info->watch_version = watch_version;
// Register the watch and check if the key has already changed
// This is done atomically to avoid race condition
{
std::lock_guard<std::mutex> l(kv_->lock_);
// Check if the key has been modified after our watch version
auto kv_it = kv_->mem_kv_.find(k);
if (kv_it != kv_->mem_kv_.end() && !kv_it->second.empty()) {
const auto& latest_version = kv_it->second.front();
if (latest_version.commit_version > watch_version) {
// Key has already changed, return immediately without blocking
return TxnErrorCode::TXN_OK;
}
}
// Register the watch only if the key hasn't changed yet
kv_->watches_[k].push_back(watch_info);
}
// Wait for the watch to be triggered
std::unique_lock<std::mutex> watch_lock(watch_info->mutex);
watch_info->cv.wait(watch_lock, [&watch_info] { return watch_info->triggered; });
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::abort() {
return TxnErrorCode::TXN_OK;
}
void Transaction::enable_get_versionstamp() {
versionstamp_enabled_ = true;
}
TxnErrorCode Transaction::get_versionstamp(Versionstamp* versionstamp) {
if (!versionstamp_enabled_) {
LOG(WARNING) << "get_versionstamp called but versionstamp not enabled";
return TxnErrorCode::TXN_INVALID_ARGUMENT;
}
if (versionstamp_result_ == Versionstamp()) {
LOG(WARNING) << "versionstamp not available, commit may not have been called or failed";
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
*versionstamp = versionstamp_result_;
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::batch_get(std::vector<std::optional<std::string>>* res,
const std::vector<std::string>& keys,
const BatchGetOptions& opts) {
if (keys.empty()) {
return TxnErrorCode::TXN_OK;
}
std::lock_guard<std::mutex> l(lock_);
res->reserve(keys.size());
for (const auto& k : keys) {
if (unreadable_keys_.count(k) != 0) {
aborted_ = true;
LOG(WARNING) << "read unreadable key, abort";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
std::string val;
auto ret = inner_get(k, &val, opts.snapshot);
ret == TxnErrorCode::TXN_OK ? res->push_back(val) : res->push_back(std::nullopt);
}
kv_->get_count_ += keys.size();
num_get_keys_ += keys.size();
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::batch_scan(
std::vector<std::optional<std::pair<std::string, std::string>>>* res,
const std::vector<std::pair<std::string, std::string>>& ranges,
const BatchGetOptions& opts) {
if (ranges.empty()) {
return TxnErrorCode::TXN_OK;
}
std::lock_guard<std::mutex> l(lock_);
res->reserve(ranges.size());
for (const auto& [start_key, end_key] : ranges) {
if (unreadable_keys_.count(start_key) != 0) {
aborted_ = true;
LOG(WARNING) << "read unreadable key, abort";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
RangeGetOptions range_opts;
range_opts.snapshot = opts.snapshot;
range_opts.batch_limit = 1;
range_opts.reverse = opts.reverse;
range_opts.begin_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL;
range_opts.end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL;
std::unique_ptr<cloud::RangeGetIterator> iter;
auto ret = inner_get(start_key, end_key, &iter, range_opts);
if (ret != TxnErrorCode::TXN_OK) {
return ret;
}
if (iter->has_next()) {
auto [found_key, found_value] = iter->next();
res->push_back(std::make_pair(std::string(found_key), std::string(found_value)));
} else {
res->push_back(std::nullopt);
}
}
kv_->get_count_ += ranges.size();
num_get_keys_ += ranges.size();
return TxnErrorCode::TXN_OK;
}
FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
FullRangeGetOptions opts)
: opts_(std::move(opts)), begin_(std::move(begin)), end_(std::move(end)) {}
FullRangeGetIterator::~FullRangeGetIterator() = default;
bool FullRangeGetIterator::has_next() {
if (!is_valid_) {
return false;
}
if (!inner_iter_) {
auto* txn = opts_.txn;
if (!txn) {
// Create a new txn for each inner range get
std::unique_ptr<cloud::Transaction> txn1;
TxnErrorCode err = opts_.txn_kv->create_txn(&txn_);
if (err != TxnErrorCode::TXN_OK) {
is_valid_ = false;
code_ = err;
return false;
}
txn = txn_.get();
}
// For simplicity, we always get the entire range without batch limit.
RangeGetOptions opts;
opts.snapshot = opts_.snapshot;
opts.batch_limit = 0;
opts.reverse = opts_.reverse;
opts.begin_key_selector = opts_.begin_key_selector;
opts.end_key_selector = opts_.end_key_selector;
TxnErrorCode err = txn->get(begin_, end_, &inner_iter_, opts);
if (err != TxnErrorCode::TXN_OK) {
is_valid_ = false;
code_ = err;
return false;
}
}
return inner_iter_->has_next();
}
std::optional<std::pair<std::string_view, std::string_view>> FullRangeGetIterator::next() {
if (!has_next()) {
return std::nullopt;
}
return inner_iter_->next();
}
std::optional<std::pair<std::string_view, std::string_view>> FullRangeGetIterator::peek() {
if (!has_next()) {
return std::nullopt;
}
return inner_iter_->peek();
}
} // namespace doris::cloud::memkv