| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <foundationdb/fdb_c.h> |
| #include <foundationdb/fdb_c_options.g.h> |
| #include <gtest/gtest_prod.h> |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <string_view> |
| #include <thread> |
| #include <utility> |
| #include <vector> |
| |
| #include "txn_kv_error.h" |
| #include "versionstamp.h" |
| |
| // ============================================================================= |
| // |
| // ============================================================================= |
| |
| namespace doris::cloud { |
| |
| class Transaction; |
| class RangeGetIterator; |
| class TxnKv; |
| |
| // A key selector is used to specify the position of a key in a range query. |
| enum class RangeKeySelector { |
| FIRST_GREATER_OR_EQUAL, |
| FIRST_GREATER_THAN, |
| LAST_LESS_OR_EQUAL, |
| LAST_LESS_THAN |
| }; |
| |
| struct RangeGetOptions { |
| // if true, key range will not be included in txn conflict detection this time. |
| bool snapshot = false; |
| // if non-zero, indicates the maximum number of key-value pairs to return. |
| int batch_limit = 10000; |
| // if true, the iterator will return keys in reverse order. |
| bool reverse = false; |
| // The key selector for the beginning of the range. |
| RangeKeySelector begin_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL; |
| // The key selector for the end of the range. |
| // |
| // REMEMBER: The end key is exclusive by default. |
| RangeKeySelector end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL; |
| }; |
| |
| /** |
| * Unlike `RangeGetIterator`, which can only iterate within a page of range, this iterator is |
| * capable of iterating over the entire specified range. |
| * |
| * Usage: |
| * for (auto kvp = it.next(); kvp.has_value(); kvp = it.next()) { |
| * auto [k, v] = *kvp; |
| * } |
| * if (!it.is_valid()) { |
| * return err; |
| * } |
| */ |
| struct FullRangeGetOptions : public RangeGetOptions { |
| std::shared_ptr<TxnKv> txn_kv; |
| // Trigger prefetch getting next batch kvs before access them |
| bool prefetch = false; |
| // If non-zero, indicates the exact number of key-value pairs to return. |
| int exact_limit = 0; |
| // Reference. If not null, each inner range get is performed through this transaction; otherwise |
| // perform each inner range get through a new transaction. |
| Transaction* txn = nullptr; |
| // If users want to extend the lifespan of the kv pair returned by `next()`, they can pass an |
| // object pool to collect the inner iterators that have completed iterated. |
| std::vector<std::unique_ptr<RangeGetIterator>>* obj_pool = nullptr; |
| |
| FullRangeGetOptions(std::shared_ptr<TxnKv> _txn_kv) : txn_kv(std::move(_txn_kv)) {} |
| FullRangeGetOptions() = default; |
| }; |
| |
| class FullRangeGetIterator { |
| public: |
| FullRangeGetIterator() = default; |
| |
| virtual ~FullRangeGetIterator() = default; |
| |
| virtual bool is_valid() const = 0; |
| |
| virtual bool has_next() = 0; |
| |
| virtual TxnErrorCode error_code() const = 0; |
| |
| virtual std::optional<std::pair<std::string_view, std::string_view>> next() = 0; |
| |
| virtual std::optional<std::pair<std::string_view, std::string_view>> peek() = 0; |
| }; |
| |
| class TxnKv { |
| public: |
| TxnKv() = default; |
| virtual ~TxnKv() = default; |
| |
| /** |
| * Creates a transaction |
| * TODO: add options to create the txn |
| * |
| * @param txn output param |
| * @return TXN_OK for success |
| */ |
| virtual TxnErrorCode create_txn(std::unique_ptr<Transaction>* txn) = 0; |
| |
| virtual int init() = 0; |
| |
| virtual std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin, std::string end, |
| FullRangeGetOptions opts) = 0; |
| }; |
| |
| class Transaction { |
| public: |
| Transaction() = default; |
| virtual ~Transaction() = default; |
| |
| virtual void put(std::string_view key, std::string_view val) = 0; |
| |
| /** |
| * @param snapshot if true, `key` will not be included in txn conflict detection this time |
| * @return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error |
| */ |
| virtual TxnErrorCode get(std::string_view key, std::string* val, bool snapshot = false) = 0; |
| |
| /** |
| * Closed-open range |
| * @param begin the begin key, inclusive |
| * @param end the end key, exclusive |
| * @param iter output param for the iterator to iterate over the key-value pairs in the specified range. |
| * If the range is empty, the iterator will be valid but `has_next()` will return false. |
| * If an error occurs, the iterator will be invalid and `error_code()` will return the error code. |
| * @param opts options for range get |
| * - `snapshot`: if true, the range will not be included in txn conflict detection this time. |
| * - `limit`: the maximum number of key-value pairs to return. |
| * - `reverse`: if true, the iterator will return keys in reverse order. |
| * @return TXN_OK for success, otherwise for error |
| */ |
| virtual TxnErrorCode get(std::string_view begin, std::string_view end, |
| std::unique_ptr<RangeGetIterator>* iter, |
| const RangeGetOptions& opts) = 0; |
| |
| // A convenience method for `get` with default options, to keep backward compatibility. |
| TxnErrorCode get(std::string_view begin, std::string_view end, |
| std::unique_ptr<RangeGetIterator>* iter, bool snapshot = false, |
| int limit = 10000) { |
| RangeGetOptions opts; |
| opts.snapshot = snapshot; |
| opts.batch_limit = limit; |
| return get(begin, end, iter, opts); |
| } |
| |
| /** |
| * Get a full range of key-value pairs. |
| * @param begin the begin key, inclusive |
| * @param end the end key, exclusive |
| * @param opts options for full range get |
| * @return a FullRangeGetIterator for iterating over the key-value pairs in the specified range. |
| * If the range is empty, the iterator will be valid but `has_next()` will return false. |
| * If an error occurs, the iterator will be invalid and `error_code()` will return the error code. |
| */ |
| virtual std::unique_ptr<FullRangeGetIterator> full_range_get( |
| std::string_view begin, std::string_view end, |
| FullRangeGetOptions opts = FullRangeGetOptions()) = 0; |
| |
| /** |
| * Put a key-value pair in which key will in the form of |
| * `key_prefix + versiontimestamp`. `versiontimestamp` is autogenerated by the |
| * system and it's 10-byte long and encoded in big-endian |
| * |
| * @param key_prefix prefix for key convertion, can be zero-length |
| * @param val value |
| */ |
| virtual void atomic_set_ver_key(std::string_view key_prefix, std::string_view val) = 0; |
| |
| /** |
| * Put a key-value pair in which key will in the form of `key_prefix + versiontimestamp + key_suffix`. |
| * `versiontimestamp` is autogenerated by the system and it's 10-byte long and encoded in big-endian. |
| * |
| * @param key key for conversion, it should at least 10-byte long. |
| * @param offset the offset of the versionstamp to place. `offset` + 10 must be less than or equal to |
| * the length of `key`. |
| * @param val value |
| * @return true for success, otherwise the offset is invalid or the key is too short. |
| */ |
| virtual bool atomic_set_ver_key(std::string_view key, uint32_t offset, |
| std::string_view val) = 0; |
| |
| /** |
| * Put a key-value pair in which key will in the form of |
| * `value + versiontimestamp`. `versiontimestamp` is autogenerated by the |
| * system and it's 10-byte long and encoded in big-endian |
| * |
| * @param key prefix for key convertion, can be zero-length |
| * @param val value |
| */ |
| virtual void atomic_set_ver_value(std::string_view key, std::string_view val) = 0; |
| |
| /** |
| * Adds a value to database. |
| * |
| * The default value is zero if no such key exists before. |
| * |
| * @param to_add positive for addition, negative for substraction |
| * @return 0 for success otherwise error |
| */ |
| virtual void atomic_add(std::string_view key, int64_t to_add) = 0; |
| // TODO: min max or and xor cmp_and_clear set_ver_value |
| |
| /** |
| * Decode the atomic value written by `atomic_add`. |
| * |
| * @param data the data to decode |
| * @return true for success, otherwise the data format is invalid. |
| */ |
| virtual bool decode_atomic_int(std::string_view data, int64_t* val) = 0; |
| |
| virtual void remove(std::string_view key) = 0; |
| |
| /** |
| * Remove a closed-open range |
| */ |
| virtual void remove(std::string_view begin, std::string_view end) = 0; |
| |
| /** |
| * |
| *@return TXN_OK for success otherwise error |
| */ |
| virtual TxnErrorCode commit() = 0; |
| |
| /** |
| * Gets the read version used by the txn. |
| * Note that it does not make any sense we call this function before |
| * any `Transaction::get()` is called. |
| * |
| *@return TXN_OK for success otherwise error |
| */ |
| virtual TxnErrorCode get_read_version(int64_t* version) = 0; |
| |
| /** |
| * Gets the commited version used by the txn. |
| * Note that it does not make any sense we call this function before |
| * a successful call to `Transaction::commit()`. |
| * |
| *@return TXN_OK for success, TXN_CONFLICT for conflict, otherwise error |
| */ |
| virtual TxnErrorCode get_committed_version(int64_t* version) = 0; |
| |
| /** |
| * Aborts this transaction |
| * |
| * @return TXN_OK for success otherwise error |
| */ |
| virtual TxnErrorCode abort() = 0; |
| |
| struct BatchGetOptions { |
| BatchGetOptions() : BatchGetOptions(false) {}; |
| BatchGetOptions(bool s) : snapshot(s), concurrency(1000) {}; |
| |
| // if true, `key` will not be included in txn conflict detection this time. |
| // |
| // Default: false |
| bool snapshot; |
| |
| // the maximum number of concurrent requests submitted to fdb at one time. |
| // |
| // Default: 1000 |
| int concurrency; |
| |
| // Used for `batch_scan`, if true, the underlying iterator will return keys in reverse order. |
| // |
| // Default: false |
| bool reverse = false; |
| }; |
| |
| /** |
| * @brief batch get keys |
| * |
| * @param res |
| * @param keys |
| * @param opts |
| * @return If all keys are successfully retrieved, return TXN_OK. Otherwise, return the code of the first occurring error |
| */ |
| virtual TxnErrorCode batch_get(std::vector<std::optional<std::string>>* res, |
| const std::vector<std::string>& keys, |
| const BatchGetOptions& opts = BatchGetOptions()) = 0; |
| |
| /** |
| * @brief Batch scan for the first key-value pair with each given key prefix. |
| * |
| * For each key prefix in the input key_prefixs, this function scans for keys that start with |
| * that prefix in the direction specified by `opts.reverse` (forward by default) and returns |
| * the first key-value pair encountered. If no key with the given prefix is found, the |
| * corresponding result is an empty optional. |
| * |
| * The function scans keys in batches and is more efficient than scanning each prefix individually. |
| * |
| * @param[out] res The output vector of optionals. Each element corresponds to the same index as in the input key_prefixs. |
| * If a key-value pair with the prefix is found, the element will contain the pair; otherwise, it will be std::nullopt. |
| * @param[in] key_prefixs The list of key prefixes to scan for. |
| * @param[in] opts Options such as `reverse` and `snapshot`. If `reverse` is true, the scan is in the backward direction. |
| * |
| * @return TXN_OK if all scans completed successfully. If any error occurs during the scanning, |
| * the function stops immediately and returns the error code of the first error encountered. |
| * Note: The output vector `res` may be partially filled when an error occurs. |
| */ |
| virtual TxnErrorCode batch_scan( |
| std::vector<std::optional<std::pair<std::string, std::string>>>* res, |
| const std::vector<std::string>& key_prefixs, |
| const BatchGetOptions& opts = BatchGetOptions()); |
| |
| /** |
| * @brief Batch scan for the first key-value pairs within each given range. |
| * |
| * For each range in the input ranges, this function performs a range scan from the begin key |
| * (inclusive) to the end key (exclusive) in the direction specified by `opts.reverse` (forward |
| * by default) and returns all key-value pairs found within that range. If no keys are found |
| * within a range, the corresponding result is an empty optional. |
| * |
| * The function scans ranges in batches and is more efficient than scanning each range individually. |
| * |
| * @param[out] res The output vector of optionals. Each element corresponds to the same index as in the input ranges. |
| * If key-value pairs are found within the range, the element will contain a vector of pairs; |
| * otherwise, it will be std::nullopt. |
| * @param[in] ranges The list of ranges to scan. Each range is a pair of (begin_key, end_key) where |
| * begin_key is inclusive and end_key is exclusive. |
| * @param[in] opts Options such as `reverse` and `snapshot`. If `reverse` is true, the scan is in the backward direction. |
| * |
| * @return TXN_OK if all scans completed successfully. If any error occurs during the scanning, |
| * the function stops immediately and returns the error code of the first error encountered. |
| * Note: The output vector `res` may be partially filled when an error occurs. |
| */ |
| virtual TxnErrorCode batch_scan( |
| std::vector<std::optional<std::pair<std::string, std::string>>>* res, |
| const std::vector<std::pair<std::string, std::string>>& ranges, |
| const BatchGetOptions& opts = BatchGetOptions()) = 0; |
| |
| /** |
| * @brief return the approximate bytes consumed by the underlying transaction buffer. |
| **/ |
| virtual size_t approximate_bytes() const = 0; |
| |
| /** |
| * @brief return the num get keys submitted to this txn. |
| **/ |
| virtual size_t num_get_keys() const = 0; |
| |
| /** |
| * @brief return the num delete keys submitted to this txn. |
| **/ |
| virtual size_t num_del_keys() const = 0; |
| |
| /** |
| * @brief return the num put keys submitted to this txn. |
| **/ |
| virtual size_t num_put_keys() const = 0; |
| |
| /** |
| * @brief return the bytes of the delete keys consumed. |
| **/ |
| virtual size_t delete_bytes() const = 0; |
| |
| /** |
| * @brief return the bytes of the get values consumed. |
| **/ |
| virtual size_t get_bytes() const = 0; |
| |
| /** |
| * @brief return the bytes of the put key and values consumed. |
| **/ |
| virtual size_t put_bytes() const = 0; |
| |
| /** |
| * @brief Enable getting versionstamp for this transaction. |
| * Must be called before commit() if you want to get versionstamp. |
| **/ |
| virtual void enable_get_versionstamp() {} |
| |
| /** |
| * @brief Get the versionstamp used by the transaction. |
| * Only available after a successful commit() and when enable_get_versionstamp() was called. |
| * @param versionstamp output parameter to store the versionstamp |
| * @return TXN_OK for success, TXN_INVALID_ARGUMENT if not enabled, |
| * TXN_KEY_NOT_FOUND if not available |
| **/ |
| virtual TxnErrorCode get_versionstamp(Versionstamp* versionstamp) { |
| return TxnErrorCode::TXN_INVALID_ARGUMENT; |
| } |
| }; |
| |
| class RangeGetIterator { |
| public: |
| RangeGetIterator() = default; |
| virtual ~RangeGetIterator() = default; |
| |
| /** |
| * Checks if we can call `next()` on this iterator. |
| */ |
| virtual bool has_next() const = 0; |
| |
| /** |
| * Gets next element, this is usually called after a check of `has_next()` succeeds, |
| * If `has_next()` is not checked, the return value may be undefined. |
| * |
| * @return a kv pair |
| */ |
| virtual std::pair<std::string_view, std::string_view> next() = 0; |
| |
| /** |
| * Gets next element but not advance the cursor, this is usually called after a check of `has_next()` succeeds, |
| * If `has_next()` is not checked, the return value may be undefined. |
| * |
| * @return a kv pair |
| */ |
| virtual std::pair<std::string_view, std::string_view> peek() const = 0; |
| |
| /** |
| * Repositions the offset to `pos` |
| */ |
| virtual void seek(size_t pos) = 0; |
| |
| /** |
| * Checks if there are more KVs to be get from the range, caller usually wants |
| * to issue another `get` with the last key of this iteration. |
| * |
| * @return if there are more kvs that this iterator cannot cover |
| */ |
| virtual bool more() const = 0; |
| |
| /** |
| * |
| * Gets size of the range, some kinds of iterators may not support this function. |
| * |
| * @return size |
| */ |
| virtual int size() const = 0; |
| |
| /** |
| * Get all FDBKeyValue's bytes include key's bytes |
| * RangeGetIterator created by get range, when get range the keys in the range too. |
| */ |
| virtual int64_t get_kv_bytes() const = 0; |
| |
| /** |
| * Get the remaining size of the range, some kinds of iterators may not support this function. |
| * |
| * @return size |
| */ |
| virtual int remaining() const = 0; |
| |
| /** |
| * Resets to initial state, some kinds of iterators may not support this function. |
| */ |
| virtual void reset() = 0; |
| |
| /** |
| * Get the begin key of the next iterator if `more()` is true, otherwise returns empty string. |
| */ |
| virtual std::string next_begin_key() const = 0; |
| |
| /** |
| * Get the last key of the iterator, it can be used as the end key of the next iterator when |
| * the key selector is FIRST_GREATER_OR_EQUAL. |
| * |
| * ATTN: This is ONLY used for reverse range get. |
| */ |
| virtual std::string_view last_key() const = 0; |
| |
| RangeGetIterator(const RangeGetIterator&) = delete; |
| RangeGetIterator& operator=(const RangeGetIterator&) = delete; |
| }; |
| |
| // ============================================================================= |
| // FoundationDB implementation of TxnKv |
| // ============================================================================= |
| |
| namespace fdb { |
| class Database; |
| class Transaction; |
| class Network; |
| } // namespace fdb |
| |
| class FdbTxnKv : public TxnKv { |
| public: |
| FdbTxnKv() = default; |
| ~FdbTxnKv() override = default; |
| |
| TxnErrorCode create_txn(std::unique_ptr<Transaction>* txn) override; |
| TxnErrorCode create_txn_with_system_access(std::unique_ptr<Transaction>* txn); |
| |
| int init() override; |
| |
| std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin, std::string end, |
| FullRangeGetOptions opts) override; |
| |
| // Return the partition boundaries of the database. |
| TxnErrorCode get_partition_boundaries(std::vector<std::string>* boundaries); |
| |
| // Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that |
| // the client is saturated. This value is updated every second. |
| double get_client_thread_busyness() const; |
| |
| static std::string_view fdb_partition_key_prefix() { return "\xff/keyServers/"; } |
| static std::string_view fdb_partition_key_end() { |
| // '0' is the next byte after '/' in the ASCII table |
| return "\xff/keyServers0"; |
| } |
| |
| private: |
| std::shared_ptr<fdb::Network> network_; |
| std::shared_ptr<fdb::Database> database_; |
| }; |
| |
| namespace fdb { |
| |
| class Network { |
| public: |
| Network(FDBNetworkOption opt) : opt_(opt) {} |
| |
| /** |
| * @return 0 for success otherwise non-zero |
| */ |
| int init(); |
| |
| /** |
| * Notify the newwork thread to stop, this is an async. call, check |
| * Network::working to ensure the network exited finally. |
| * |
| * FIXME: may be we can implement it as a sync. function. |
| */ |
| void stop(); |
| |
| ~Network() = default; |
| |
| private: |
| std::shared_ptr<std::thread> network_thread_; |
| FDBNetworkOption opt_; |
| |
| // Global state, only one instance of Network is allowed |
| static std::atomic<bool> working; |
| }; |
| |
| class Database { |
| public: |
| Database(std::shared_ptr<Network> net, std::string cluster_file, FDBDatabaseOption opt) |
| : network_(std::move(net)), cluster_file_path_(std::move(cluster_file)), opt_(opt) {} |
| |
| /** |
| * |
| * @return 0 for success otherwise false |
| */ |
| int init(); |
| |
| ~Database() { |
| if (db_ != nullptr) fdb_database_destroy(db_); |
| } |
| |
| FDBDatabase* db() { return db_; }; |
| |
| std::shared_ptr<Transaction> create_txn(FDBTransactionOption opt); |
| |
| private: |
| std::shared_ptr<Network> network_; |
| std::string cluster_file_path_; |
| FDBDatabase* db_ = nullptr; |
| FDBDatabaseOption opt_; |
| }; |
| |
| class RangeGetIterator : public cloud::RangeGetIterator { |
| public: |
| /** |
| * Iterator takes the ownership of input future |
| */ |
| RangeGetIterator(FDBFuture* fut, bool owns = true) |
| : fut_(fut), owns_fut_(owns), kvs_(nullptr), kvs_size_(-1), more_(false), idx_(-1) {} |
| |
| RangeGetIterator(RangeGetIterator&& o) { |
| if (fut_ && owns_fut_) fdb_future_destroy(fut_); |
| fut_ = o.fut_; |
| owns_fut_ = o.owns_fut_; |
| kvs_ = o.kvs_; |
| kvs_size_ = o.kvs_size_; |
| more_ = o.more_; |
| idx_ = o.idx_; |
| |
| o.fut_ = nullptr; |
| o.kvs_ = nullptr; |
| o.idx_ = 0; |
| o.kvs_size_ = 0; |
| o.more_ = false; |
| } |
| |
| ~RangeGetIterator() override { |
| // Release all memory |
| if (fut_ && owns_fut_) fdb_future_destroy(fut_); |
| } |
| |
| TxnErrorCode init(); |
| |
| std::pair<std::string_view, std::string_view> next() override { |
| if (idx_ < 0 || idx_ >= kvs_size_) return {}; |
| const auto& kv = kvs_[idx_++]; |
| return {{(char*)kv.key, (size_t)kv.key_length}, {(char*)kv.value, (size_t)kv.value_length}}; |
| } |
| |
| std::pair<std::string_view, std::string_view> peek() const override { |
| if (idx_ < 0 || idx_ >= kvs_size_) return {}; |
| const auto& kv = kvs_[idx_]; |
| return {{(char*)kv.key, (size_t)kv.key_length}, {(char*)kv.value, (size_t)kv.value_length}}; |
| } |
| |
| void seek(size_t pos) override { idx_ = pos; } |
| |
| bool has_next() const override { return (idx_ < kvs_size_); } |
| |
| /** |
| * Check if there are more KVs to be get from the range, caller usually wants |
| * to issue a nother `get` with the last key of this iteration. |
| */ |
| bool more() const override { return more_; } |
| |
| int size() const override { return kvs_size_; } |
| |
| int64_t get_kv_bytes() const override { |
| int64_t total_bytes {}; |
| for (int i = 0; i < kvs_size_; i++) { |
| total_bytes += kvs_[i].key_length + kvs_[i].value_length; |
| } |
| return total_bytes; |
| } |
| |
| int remaining() const override { |
| if (idx_ < 0 || idx_ >= kvs_size_) return 0; |
| return kvs_size_ - idx_; |
| } |
| |
| void reset() override { idx_ = 0; } |
| |
| std::string next_begin_key() const override { |
| std::string k; |
| if (!more()) return k; |
| const auto& kv = kvs_[kvs_size_ - 1]; |
| k.reserve((size_t)kv.key_length + 1); |
| k.append((char*)kv.key, (size_t)kv.key_length); |
| k.push_back('\x00'); |
| return k; |
| } |
| |
| std::string_view last_key() const override { |
| if (kvs_size_ <= 0) return {}; |
| const auto& kv = kvs_[kvs_size_ - 1]; |
| return {(char*)kv.key, (size_t)kv.key_length}; |
| } |
| |
| RangeGetIterator(const RangeGetIterator&) = delete; |
| RangeGetIterator& operator=(const RangeGetIterator&) = delete; |
| |
| private: |
| FDBFuture* fut_; |
| bool owns_fut_; |
| const FDBKeyValue* kvs_; |
| int kvs_size_; |
| fdb_bool_t more_; |
| int idx_; |
| }; |
| |
| class Transaction : public cloud::Transaction { |
| FRIEND_TEST(TxnKvTest, ReportConflictingRange); |
| FRIEND_TEST(TxnKvTest, VersionedGetConflictRange); |
| |
| public: |
| friend class Database; |
| friend class FullRangeGetIterator; |
| |
| Transaction(std::shared_ptr<Database> db) : db_(std::move(db)) {} |
| |
| ~Transaction() override { |
| if (txn_) fdb_transaction_destroy(txn_); |
| } |
| |
| /** |
| * |
| * @return TxnErrorCode for success otherwise false |
| */ |
| TxnErrorCode init(); |
| TxnErrorCode enable_access_system_keys(); |
| |
| void put(std::string_view key, std::string_view val) override; |
| |
| using cloud::Transaction::get; |
| /** |
| * @param snapshot if true, `key` will not be included in txn conflict detection this time |
| * @return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error |
| */ |
| TxnErrorCode get(std::string_view key, std::string* val, bool snapshot = false) override; |
| /** |
| * Closed-open range |
| * @param begin the begin key, inclusive |
| * @param end the end key, exclusive |
| * @param iter output param for the iterator to iterate over the key-value pairs in the specified range. |
| * If the range is empty, the iterator will be valid but `has_next()` will return false. |
| * If an error occurs, the iterator will be invalid and `error_code()` will return the error code. |
| * @param opts options for range get |
| * - `snapshot`: if true, the range will not be included in txn conflict detection this time. |
| * - `limit`: the maximum number of key-value pairs to return. |
| * - `reverse`: if true, the iterator will return keys in reverse order. |
| * @return TXN_OK for success, otherwise for error |
| */ |
| TxnErrorCode get(std::string_view begin, std::string_view end, |
| std::unique_ptr<cloud::RangeGetIterator>* iter, |
| const RangeGetOptions& opts) override; |
| |
| std::unique_ptr<cloud::FullRangeGetIterator> full_range_get( |
| std::string_view begin, std::string_view end, |
| FullRangeGetOptions opts = FullRangeGetOptions()) override; |
| |
| /** |
| * Put a key-value pair in which key will in the form of |
| * `key_prefix + versiontimestamp`. `versiontimestamp` is autogenerated by the |
| * system and it's 10-byte long and encoded in big-endian |
| * |
| * @param key_prefix prefix for key convertion, can be zero-length |
| * @param val value |
| */ |
| void atomic_set_ver_key(std::string_view key_prefix, std::string_view val) override; |
| |
| /** |
| * Put a key-value pair in which key will in the form of `key_prefix + versiontimestamp + key_suffix`. |
| * |
| * @param key key for conversion, it should at least 10-byte long. |
| * @param offset the offset of the versionstamp to place. `offset` + 10 must be less than or equal to |
| * the length of `key`. |
| * @param val value |
| * @return true for success, otherwise the offset is invalid or the key is too short. |
| */ |
| bool atomic_set_ver_key(std::string_view key, uint32_t offset, std::string_view val) override; |
| |
| /** |
| * Put a key-value pair in which key will in the form of |
| * `value + versiontimestamp`. `versiontimestamp` is autogenerated by the |
| * system and it's 10-byte long and encoded in big-endian |
| * |
| * @param key prefix for key convertion, can be zero-length |
| * @param val value |
| */ |
| void atomic_set_ver_value(std::string_view key, std::string_view val) override; |
| |
| /** |
| * Adds a value to database |
| * @param to_add positive for addition, negative for substraction |
| */ |
| void atomic_add(std::string_view key, int64_t to_add) override; |
| // TODO: min max or and xor cmp_and_clear set_ver_value |
| |
| bool decode_atomic_int(std::string_view data, int64_t* val) override; |
| |
| void remove(std::string_view key) override; |
| |
| /** |
| * Remove a closed-open range |
| */ |
| void remove(std::string_view begin, std::string_view end) override; |
| |
| /** |
| * |
| *@return TXN_OK for success, TXN_CONFLICT for conflict, otherwise for error |
| */ |
| TxnErrorCode commit() override; |
| |
| TxnErrorCode get_read_version(int64_t* version) override; |
| TxnErrorCode get_committed_version(int64_t* version) override; |
| |
| TxnErrorCode abort() override; |
| |
| void enable_get_versionstamp() override; |
| |
| TxnErrorCode get_versionstamp(Versionstamp* versionstamp) override; |
| |
| TxnErrorCode batch_get(std::vector<std::optional<std::string>>* res, |
| const std::vector<std::string>& keys, |
| const BatchGetOptions& opts = BatchGetOptions()) override; |
| |
| TxnErrorCode batch_scan(std::vector<std::optional<std::pair<std::string, std::string>>>* res, |
| const std::vector<std::pair<std::string, std::string>>& ranges, |
| const BatchGetOptions& opts = BatchGetOptions()) override; |
| |
| size_t approximate_bytes() const override { return approximate_bytes_; } |
| |
| size_t num_get_keys() const override { return num_get_keys_; } |
| |
| size_t num_del_keys() const override { return num_del_keys_; } |
| |
| size_t num_put_keys() const override { return num_put_keys_; } |
| |
| size_t delete_bytes() const override { return delete_bytes_; } |
| |
| size_t put_bytes() const override { return put_bytes_; } |
| |
| size_t get_bytes() const override { return get_bytes_; } |
| |
| private: |
| // Return the conflicting range when the transaction commit returns TXN_CONFLICT. |
| // |
| // It only works when the report_conflicting_ranges option is enabled. |
| TxnErrorCode get_conflicting_range( |
| std::vector<std::pair<std::string, std::string>>* key_values); |
| TxnErrorCode report_conflicting_range(); |
| |
| std::shared_ptr<Database> db_ {nullptr}; |
| bool commited_ = false; |
| bool aborted_ = false; |
| FDBTransaction* txn_ = nullptr; |
| |
| size_t num_get_keys_ {0}; |
| size_t num_del_keys_ {0}; |
| size_t num_put_keys_ {0}; |
| size_t delete_bytes_ {0}; |
| size_t get_bytes_ {0}; |
| size_t put_bytes_ {0}; |
| size_t approximate_bytes_ {0}; |
| |
| bool versionstamp_enabled_ {false}; |
| Versionstamp versionstamp_result_; |
| }; |
| |
| class FullRangeGetIterator final : public cloud::FullRangeGetIterator { |
| public: |
| FullRangeGetIterator(std::string begin, std::string end, FullRangeGetOptions opts); |
| |
| ~FullRangeGetIterator() override; |
| |
| bool is_valid() const override { return code_ == TxnErrorCode::TXN_OK; } |
| |
| bool has_next() override; |
| |
| TxnErrorCode error_code() const override { return code_; } |
| |
| std::optional<std::pair<std::string_view, std::string_view>> next() override; |
| |
| std::optional<std::pair<std::string_view, std::string_view>> peek() override; |
| |
| private: |
| // Set `is_valid_` to false if meet any error |
| void init(); |
| |
| // Await `fut_` and create new inner iter. |
| // Set `is_valid_` to false if meet any error |
| void await_future(); |
| |
| // Perform a paginate range get asynchronously and set `fut_`. |
| // Set `is_valid_` to false if meet any error |
| void async_inner_get(std::string_view begin, std::string_view end); |
| void async_get_next_batch(); |
| |
| bool prefetch(); |
| |
| FullRangeGetOptions opts_; |
| |
| TxnErrorCode code_ = TxnErrorCode::TXN_OK; |
| int num_consumed_ = 0; |
| std::string begin_; |
| std::string end_; |
| std::unique_ptr<Transaction> txn_; |
| std::unique_ptr<RangeGetIterator> inner_iter_; |
| FDBFuture* fut_ = nullptr; |
| }; |
| |
| } // namespace fdb |
| |
| } // namespace doris::cloud |