blob: 84593de3c200f8d3540e3d08d9b23263a2388593 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "mem_txn_kv.h"
#include <glog/logging.h>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <optional>
#include <ostream>
#include <string>
#include "cpp/sync_point.h"
#include "meta-service/txn_kv_error.h"
#include "txn_kv.h"
namespace doris::cloud {
int MemTxnKv::init() {
return 0;
}
TxnErrorCode MemTxnKv::create_txn(std::unique_ptr<Transaction>* txn) {
auto* t = new memkv::Transaction(this->shared_from_this());
txn->reset(t);
return TxnErrorCode::TXN_OK;
}
TxnErrorCode MemTxnKv::get_kv(const std::string& key, std::string* val, int64_t version) {
std::lock_guard<std::mutex> l(lock_);
auto it = mem_kv_.find(key);
if (it == mem_kv_.end() || it->second.empty()) {
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
for (auto&& entry : it->second) {
if (entry.commit_version <= version) {
if (!entry.value.has_value()) {
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
*val = *entry.value;
return TxnErrorCode::TXN_OK;
}
}
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
TxnErrorCode MemTxnKv::get_kv(const std::string& begin, const std::string& end, int64_t version,
int limit, bool* more, std::map<std::string, std::string>* kv_list) {
if (begin >= end) {
return TxnErrorCode::TXN_OK;
}
bool use_limit = true;
if (limit < 0) {
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
if (limit == 0) {
use_limit = false;
}
std::unique_lock<std::mutex> l(lock_);
*more = false;
auto begin_iter = mem_kv_.lower_bound(begin);
auto end_iter = mem_kv_.lower_bound(end);
for (; begin_iter != mem_kv_.end() && begin_iter != end_iter; begin_iter++) {
for (auto&& entry : begin_iter->second) {
if (entry.commit_version > version) {
continue;
}
if (!entry.value.has_value()) {
break;
}
kv_list->insert_or_assign(begin_iter->first, *entry.value);
limit--;
break;
}
if (use_limit && limit == 0) {
break;
}
}
if (use_limit && limit == 0 && ++begin_iter != end_iter) {
*more = true;
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode MemTxnKv::update(const std::set<std::string>& read_set,
const std::vector<OpTuple>& op_list, int64_t read_version,
int64_t* committed_version) {
std::lock_guard<std::mutex> l(lock_);
// check_conflict
for (const auto& k : read_set) {
auto iter = log_kv_.find(k);
if (iter != log_kv_.end()) {
auto log_item = iter->second;
if (log_item.front().commit_version_ > read_version) {
LOG(WARNING) << "commit conflict";
//keep the same behaviour with fdb.
return TxnErrorCode::TXN_CONFLICT;
}
}
}
++committed_version_;
int16_t seq = 0;
for (const auto& vec : op_list) {
const auto& [op_type, k, v] = vec;
LogItem log_item {op_type, committed_version_, k, v};
log_kv_[k].push_front(log_item);
switch (op_type) {
case memkv::ModifyOpType::PUT: {
mem_kv_[k].push_front(Version {committed_version_, v});
break;
}
case memkv::ModifyOpType::ATOMIC_SET_VER_KEY: {
std::string ver_key(k);
gen_version_timestamp(committed_version_, seq, &ver_key);
mem_kv_[ver_key].push_front(Version {committed_version_, v});
break;
}
case memkv::ModifyOpType::ATOMIC_SET_VER_VAL: {
std::string ver_val(v);
gen_version_timestamp(committed_version_, seq, &ver_val);
mem_kv_[k].push_front(Version {committed_version_, ver_val});
break;
}
case memkv::ModifyOpType::ATOMIC_ADD: {
std::string org_val;
if (!mem_kv_[k].empty()) {
org_val = mem_kv_[k].front().value.value_or("");
}
if (org_val.size() != 8) {
org_val.resize(8, '\0');
}
int64_t res = *(int64_t*)org_val.data() + *(int64_t*)v.data();
std::memcpy(org_val.data(), &res, 8);
mem_kv_[k].push_front(Version {committed_version_, org_val});
break;
}
case memkv::ModifyOpType::REMOVE: {
mem_kv_[k].push_front(Version {committed_version_, std::nullopt});
break;
}
case memkv::ModifyOpType::REMOVE_RANGE: {
auto begin_iter = mem_kv_.lower_bound(k);
auto end_iter = mem_kv_.lower_bound(v);
while (begin_iter != end_iter) {
mem_kv_[begin_iter->first].push_front(Version {committed_version_, std::nullopt});
begin_iter++;
}
break;
}
default:
break;
}
}
*committed_version = committed_version_;
return TxnErrorCode::TXN_OK;
}
int MemTxnKv::gen_version_timestamp(int64_t ver, int16_t seq, std::string* str) {
// Convert litter endian to big endian
static auto to_big_int64 = [](int64_t v) {
v = ((v & 0xffffffff00000000) >> 32) | ((v & 0x00000000ffffffff) << 32);
v = ((v & 0xffff0000ffff0000) >> 16) | ((v & 0x0000ffff0000ffff) << 16);
v = ((v & 0xff00ff00ff00ff00) >> 8) | ((v & 0x00ff00ff00ff00ff) << 8);
return v;
};
static auto to_big_int16 = [](int16_t v) {
v = ((v & 0xff00) >> 8) | ((v & 0x00ff) << 8);
return v;
};
ver = to_big_int64(ver);
seq = to_big_int16(seq);
int size = str->size();
str->resize(size + 10, '\0');
std::memcpy(str->data() + size, &ver, sizeof(ver));
std::memcpy(str->data() + size + 8, &seq, sizeof(seq));
return 0;
}
int64_t MemTxnKv::get_last_commited_version() {
std::lock_guard<std::mutex> l(lock_);
return committed_version_;
}
int64_t MemTxnKv::get_last_read_version() {
std::lock_guard<std::mutex> l(lock_);
read_version_ = committed_version_;
return read_version_;
}
std::unique_ptr<FullRangeGetIterator> MemTxnKv::full_range_get(std::string begin, std::string end,
FullRangeGetIteratorOptions opts) {
return std::make_unique<memkv::FullRangeGetIterator>(std::move(begin), std::move(end),
std::move(opts));
}
} // namespace doris::cloud
namespace doris::cloud::memkv {
// =============================================================================
// Impl of Transaction
// =============================================================================
Transaction::Transaction(std::shared_ptr<MemTxnKv> kv) : kv_(std::move(kv)) {
std::lock_guard<std::mutex> l(lock_);
read_version_ = kv_->committed_version_;
}
int Transaction::init() {
return 0;
}
void Transaction::put(std::string_view key, std::string_view val) {
std::lock_guard<std::mutex> l(lock_);
std::string k(key.data(), key.size());
std::string v(val.data(), val.size());
writes_.insert_or_assign(k, v);
op_list_.emplace_back(ModifyOpType::PUT, k, v);
++num_put_keys_;
put_bytes_ += key.size() + val.size();
approximate_bytes_ += key.size() + val.size();
}
TxnErrorCode Transaction::get(std::string_view key, std::string* val, bool snapshot) {
std::lock_guard<std::mutex> l(lock_);
std::string k(key.data(), key.size());
// the key set by atomic_xxx can't not be read before the txn is committed.
// if it is read, the txn will not be able to commit.
if (unreadable_keys_.count(k) != 0) {
aborted_ = true;
LOG(WARNING) << "read unreadable key, abort";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
return inner_get(k, val, snapshot);
}
TxnErrorCode Transaction::get(std::string_view begin, std::string_view end,
std::unique_ptr<cloud::RangeGetIterator>* iter, bool snapshot,
int limit) {
TEST_SYNC_POINT_CALLBACK("memkv::Transaction::get", &limit);
std::lock_guard<std::mutex> l(lock_);
std::string begin_k(begin.data(), begin.size());
std::string end_k(end.data(), end.size());
// TODO: figure out what happen if range_get has part of unreadable_keys
if (unreadable_keys_.count(begin_k) != 0) {
aborted_ = true;
LOG(WARNING) << "read unreadable key, abort";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
return inner_get(begin_k, end_k, iter, snapshot, limit);
}
TxnErrorCode Transaction::inner_get(const std::string& key, std::string* val, bool snapshot) {
// Read your writes.
auto it = writes_.find(key);
if (it != writes_.end()) {
*val = it->second;
return TxnErrorCode::TXN_OK;
}
if (!snapshot) read_set_.emplace(key);
TxnErrorCode err = kv_->get_kv(key, val, read_version_);
if (err != TxnErrorCode::TXN_OK) {
return err;
}
for (auto&& [start, end] : remove_ranges_) {
if (start <= key && key < end) {
return TxnErrorCode::TXN_KEY_NOT_FOUND;
}
}
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::inner_get(const std::string& begin, const std::string& end,
std::unique_ptr<cloud::RangeGetIterator>* iter, bool snapshot,
int limit) {
bool more = false;
std::map<std::string, std::string> kv_map;
TxnErrorCode err = kv_->get_kv(begin, end, read_version_, limit, &more, &kv_map);
if (err != TxnErrorCode::TXN_OK) {
return err;
}
// Overwrite by your writes.
auto pred = [&](const std::pair<std::string, std::string>& val) {
for (auto&& [start, end] : remove_ranges_) {
if (start <= val.first && val.first < end) {
return true;
}
}
return false;
};
for (auto it = kv_map.begin(), last = kv_map.end(); it != last;) {
if (pred(*it)) {
it = kv_map.erase(it);
} else {
++it;
}
}
if (!snapshot) {
for (auto&& [key, _] : kv_map) {
read_set_.insert(key);
}
}
auto begin_iter = writes_.lower_bound(begin);
auto end_iter = writes_.lower_bound(end);
while (begin_iter != end_iter) {
kv_map.insert_or_assign(begin_iter->first, begin_iter->second);
begin_iter++;
}
std::vector<std::pair<std::string, std::string>> kv_list(kv_map.begin(), kv_map.end());
*iter = std::make_unique<memkv::RangeGetIterator>(std::move(kv_list), more);
return TxnErrorCode::TXN_OK;
}
void Transaction::atomic_set_ver_key(std::string_view key_prefix, std::string_view val) {
std::lock_guard<std::mutex> l(lock_);
std::string k(key_prefix.data(), key_prefix.size());
std::string v(val.data(), val.size());
unreadable_keys_.insert(k);
op_list_.emplace_back(ModifyOpType::ATOMIC_SET_VER_KEY, k, v);
++num_put_keys_;
put_bytes_ += key_prefix.size() + val.size();
approximate_bytes_ += key_prefix.size() + val.size();
}
void Transaction::atomic_set_ver_value(std::string_view key, std::string_view value) {
std::lock_guard<std::mutex> l(lock_);
std::string k(key.data(), key.size());
std::string v(value.data(), value.size());
unreadable_keys_.insert(k);
op_list_.emplace_back(ModifyOpType::ATOMIC_SET_VER_VAL, k, v);
++num_put_keys_;
put_bytes_ += key.size() + value.size();
approximate_bytes_ += key.size() + value.size();
}
void Transaction::atomic_add(std::string_view key, int64_t to_add) {
std::string k(key.data(), key.size());
std::string v(sizeof(to_add), '\0');
memcpy(v.data(), &to_add, sizeof(to_add));
std::lock_guard<std::mutex> l(lock_);
op_list_.emplace_back(ModifyOpType::ATOMIC_ADD, std::move(k), std::move(v));
++num_put_keys_;
put_bytes_ += key.size() + 8;
approximate_bytes_ += key.size() + 8;
}
bool Transaction::decode_atomic_int(std::string_view data, int64_t* val) {
if (data.size() != sizeof(int64_t)) {
return false;
}
memcpy(val, data.data(), sizeof(*val));
return true;
}
void Transaction::remove(std::string_view key) {
std::lock_guard<std::mutex> l(lock_);
std::string k(key.data(), key.size());
writes_.erase(k);
std::string end_key = k;
end_key.push_back(0x0);
remove_ranges_.emplace_back(k, end_key);
op_list_.emplace_back(ModifyOpType::REMOVE, k, "");
++num_del_keys_;
delete_bytes_ += key.size();
approximate_bytes_ += key.size();
}
void Transaction::remove(std::string_view begin, std::string_view end) {
std::lock_guard<std::mutex> l(lock_);
std::string begin_k(begin.data(), begin.size());
std::string end_k(end.data(), end.size());
if (begin_k >= end_k) {
aborted_ = true;
} else {
// ATTN: we do not support read your writes about delete range.
auto begin_iter = writes_.lower_bound(begin_k);
auto end_iter = writes_.lower_bound(end_k);
writes_.erase(begin_iter, end_iter);
remove_ranges_.emplace_back(begin_k, end_k);
op_list_.emplace_back(ModifyOpType::REMOVE_RANGE, begin_k, end_k);
}
++num_del_keys_;
delete_bytes_ += begin.size() + end.size();
approximate_bytes_ += begin.size() + end.size();
}
TxnErrorCode Transaction::commit() {
std::lock_guard<std::mutex> l(lock_);
if (aborted_) {
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
auto code = kv_->update(read_set_, op_list_, read_version_, &committed_version_);
if (code != TxnErrorCode::TXN_OK) {
return code;
}
commited_ = true;
op_list_.clear();
read_set_.clear();
writes_.clear();
remove_ranges_.clear();
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get_read_version(int64_t* version) {
std::lock_guard<std::mutex> l(lock_);
*version = read_version_;
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::get_committed_version(int64_t* version) {
std::lock_guard<std::mutex> l(lock_);
if (!commited_) {
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
*version = committed_version_;
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::abort() {
return TxnErrorCode::TXN_OK;
}
TxnErrorCode Transaction::batch_get(std::vector<std::optional<std::string>>* res,
const std::vector<std::string>& keys,
const BatchGetOptions& opts) {
if (keys.empty()) {
return TxnErrorCode::TXN_OK;
}
std::lock_guard<std::mutex> l(lock_);
res->reserve(keys.size());
for (const auto& k : keys) {
if (unreadable_keys_.count(k) != 0) {
aborted_ = true;
LOG(WARNING) << "read unreadable key, abort";
return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
}
std::string val;
auto ret = inner_get(k, &val, opts.snapshot);
ret == TxnErrorCode::TXN_OK ? res->push_back(val) : res->push_back(std::nullopt);
}
return TxnErrorCode::TXN_OK;
}
FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
FullRangeGetIteratorOptions opts)
: opts_(std::move(opts)), begin_(std::move(begin)), end_(std::move(end)) {}
FullRangeGetIterator::~FullRangeGetIterator() = default;
bool FullRangeGetIterator::has_next() {
if (!is_valid_) {
return false;
}
if (!inner_iter_) {
auto* txn = opts_.txn;
if (!txn) {
// Create a new txn for each inner range get
std::unique_ptr<cloud::Transaction> txn1;
TxnErrorCode err = opts_.txn_kv->create_txn(&txn_);
if (err != TxnErrorCode::TXN_OK) {
is_valid_ = false;
return false;
}
txn = txn_.get();
}
TxnErrorCode err = txn->get(begin_, end_, &inner_iter_, opts_.snapshot, 0);
if (err != TxnErrorCode::TXN_OK) {
is_valid_ = false;
return false;
}
}
return inner_iter_->has_next();
}
std::optional<std::pair<std::string_view, std::string_view>> FullRangeGetIterator::next() {
if (!has_next()) {
return std::nullopt;
}
return inner_iter_->next();
}
} // namespace doris::cloud::memkv