blob: 25d5ad30366dfda1f5c821cb0901411be837a664 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <string>
#include <string_view>
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
namespace google::protobuf {
class Message;
}
namespace doris::cloud {
/**
* Supports splitting large values (>100KB) into multiple KVs, with a logical value size of up to the fdb transaction limit (<10MB).
* Supports multi version format parsing of values (which can be any byte sequence format), and can recognize the version of values
* that are forward compatible with older versions of values.
* Key format:
* {origin_key}{suffix: i64}
* suffix (big-endian):
* |Bytes 0 |Bytes 1-5 |Bytes 6-7 |
* |-------------|-------------|-------------|
* |version |dummy |sequence |
*/
struct ValueBuf {
// TODO(plat1ko): Support decompression
[[nodiscard]] bool to_pb(google::protobuf::Message* pb) const;
// TODO: More bool to_xxx(Xxx* xxx) const;
// Remove all splitted KV in `iters_` via `txn`
void remove(Transaction* txn) const;
// Get a key, save raw splitted values of the key to `this`, value length may be bigger than 100k
// Return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error.
TxnErrorCode get(Transaction* txn, std::string_view key, bool snapshot = false);
// return the merged value in ValueBuf
std::string value() const;
// return all keys in ValueBuf, if the value is not splitted, size of keys is 1
std::vector<std::string> keys() const;
std::vector<std::unique_ptr<RangeGetIterator>> iters;
int8_t ver {-1};
};
/**
* Get a key, return key's value, value length may be bigger than 100k
* @param txn fdb txn handler
* @param key encode key
* @param val return wrapped raw splitted values of the key
* @param snapshot if true, `key` will not be included in txn conflict detection this time
* @return return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error.
*/
TxnErrorCode blob_get(Transaction* txn, std::string_view key, ValueBuf* val, bool snapshot = false);
/**
* Put a KV, it's value may be bigger than 100k
* TODO(plat1ko): Support compression
* @param txn fdb txn handler
* @param key encode key
* @param pb value to save
* @param ver value version
* @param split_size how many byte sized fragments are the value split into
*/
void blob_put(Transaction* txn, std::string_view key, const google::protobuf::Message& pb,
uint8_t ver, size_t split_size = 90 * 1000);
/**
* Put a KV, it's value may be bigger than 100k
* @param txn fdb txn handler
* @param key encode key
* @param value value to save
* @param ver value version
* @param split_size how many byte sized fragments are the value split into
*/
void blob_put(Transaction* txn, std::string_view key, std::string_view value, uint8_t ver,
size_t split_size = 90 * 1000);
// Iterator for blob key-value pairs.
//
// The keys with the same origin key are considered as a blob. The origin key is obtained by
// removing the sequence suffix from the raw key.
//
// ATTN: This iterator is not compatible with the old style blob keys (without sequence suffix).
//
// Usage:
// auto iter = blob_get_range(txn_kv, begin_key, end_key);
// for (; iter->valid(); iter->next()) {
// // Process the current blob
// auto origin_key = iter->key();
// auto version = iter->version();
// auto raw_keys = iter->raw_keys();
// auto values = iter->values();
// // Or parse the blob value into a protobuf message
// YourProtoMessage pb;
// if (iter->parse_value(&pb)) {
// // Successfully parsed
// }
// }
// if (iter->error_code() != TxnErrorCode::TXN_OK) {
// // Handle error
// }
class BlobIterator {
public:
BlobIterator(std::unique_ptr<FullRangeGetIterator> iter);
~BlobIterator() = default;
BlobIterator(const BlobIterator&) = delete;
BlobIterator& operator=(const BlobIterator&) = delete;
// There are two states of validity:
// 1. The iterator is valid and points to a blob (a set of KVs with the same origin key).
// In this state, `valid()` returns true.
// 2. The iterator is invalid (either due to an error or because it has reached the end).
// In this state, `valid()` returns false.
bool valid() const;
// Move to the next blob.
//
// If there is no next blob, the iterator becomes invalid.
void next();
// Get the error code of the iterator. If the iterator has reached the end, returns TXN_OK.
TxnErrorCode error_code() const;
// Return the version of the current blob. `valid()` must be true before calling this method.
uint8_t version() const { return version_; }
// Return the origin key of the current blob. `valid()` must be true before calling this method.
std::string_view key() const { return current_blob_key_; }
// Return the raw keys of the current blob. `valid()` must be true before calling this method.
const std::vector<std::string>& raw_keys() const { return current_blob_raw_keys_; }
// Return the values of the current blob. `valid()` must be true before calling this method.
const std::vector<std::string>& values() const { return current_blob_values_; }
// Parse the value of the current blob into the given protobuf message. `valid()` must be true before calling this method.
bool parse_value(google::protobuf::Message* pb) const;
private:
bool is_underlying_iter_valid() const { return iter_ && iter_->is_valid(); }
void load_current_blob();
bool extract_origin_key(std::string_view raw_key, std::string* origin_key, uint8_t* version,
uint16_t* sequence);
TxnErrorCode error_code_ = TxnErrorCode::TXN_OK;
std::unique_ptr<FullRangeGetIterator> iter_;
uint8_t version_ = 0;
std::string current_blob_key_;
std::vector<std::string> current_blob_raw_keys_;
std::vector<std::string> current_blob_values_;
};
// Get a range of blob key-value pairs.
//
// It requires that all keys in the range are in the new blob format (with sequence suffix).
std::unique_ptr<BlobIterator> blob_get_range(const std::shared_ptr<TxnKv>& txn_kv,
std::string_view begin_key, std::string_view end_key,
bool snapshot = false);
// Get a range of blob key-value pairs.
//
// It requires that all keys in the range are in the new blob format (with sequence suffix).
std::unique_ptr<BlobIterator> blob_get_range(Transaction* txn, std::string_view begin_key,
std::string_view end_key, bool snapshot = false);
namespace versioned {
// Put a blob message with a auto generated versionstamp.
void blob_put(Transaction* txn, std::string_view key, const google::protobuf::Message& pb,
uint8_t ver = 0, size_t split_size = 90 * 1000);
// Put a blob message with a auto generated versionstamp.
void blob_put(Transaction* txn, std::string_view key, std::string_view value, uint8_t ver = 0,
size_t split_size = 90 * 1000);
// Put a blob message with a specified versionstamp.
void blob_put(Transaction* txn, std::string_view key, Versionstamp v,
const google::protobuf::Message& pb, uint8_t ver = 0, size_t split_size = 90 * 1000);
// Put a blob message with a specified versionstamp.
void blob_put(Transaction* txn, std::string_view key, Versionstamp v, std::string_view value,
uint8_t ver = 0, size_t split_size = 90 * 1000);
} // namespace versioned
} // namespace doris::cloud