blob: c778ca671be59dec9d867de244d4dd9d1ce97644 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <glog/logging.h>
#include "common/util.h"
#include "meta-store/codec.h"
#include "meta-store/document_message.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versionstamp.h"
namespace doris::cloud::versioned {
// Options for reading document messages from a transaction.
//
// The default range is (begin_key, end_key].
struct ReadDocumentMessagesOptions {
// Whether to execute snapshot read?
bool snapshot = false;
// The maximum number of keys to return in a single batch.
int batch_limit = 0;
// The versionstamp to use for the snapshot read.
// If this is set to Versionstamp::max(), it will read the latest version.
// Otherwise, it will read the version before this versionstamp.
Versionstamp snapshot_version = Versionstamp::max();
// Whether to exclude the begin key from the result range.
bool exclude_begin_key = true;
// Whether to exclude the end key from the result range.
bool exclude_end_key = false;
};
template <typename Message>
requires IsProtobufMessage<Message>
class DocumentMessageIterator {
public:
using Element = std::tuple<std::string_view, Versionstamp, Message&>;
DocumentMessageIterator(std::unique_ptr<FullRangeGetIterator> iter,
Versionstamp snapshot_version)
: snapshot_version_(snapshot_version), iter_(std::move(iter)) {}
DocumentMessageIterator(const DocumentMessageIterator&) = delete;
DocumentMessageIterator& operator=(const DocumentMessageIterator&) = delete;
DocumentMessageIterator(DocumentMessageIterator&& other)
: snapshot_version_(other.snapshot_version_),
error_code_(other.error_code_),
has_find_(other.has_find_),
current_key_and_version_(std::move(other.current_key_and_version_)),
current_key_(other.current_key_),
current_version_(other.current_version_),
current_value_(std::move(other.current_value_)),
iter_(std::move(other.iter_)) {}
DocumentMessageIterator& operator=(DocumentMessageIterator&& other) {
if (this != &other) {
snapshot_version_ = other.snapshot_version_;
has_find_ = other.has_find_;
current_key_ = other.current_key_;
error_code_ = other.error_code_;
current_key_and_version_ = std::move(other.current_key_and_version_);
current_version_ = other.current_version_;
current_value_ = std::move(other.current_value_);
iter_ = std::move(other.iter_);
}
return *this;
}
TxnErrorCode error_code() const {
return error_code_ == TxnErrorCode::TXN_OK ? iter_->error_code() : error_code_;
}
bool is_valid() const { return iter_->is_valid() && error_code_ == TxnErrorCode::TXN_OK; }
bool has_next();
std::optional<Element> next();
std::optional<Element> peek();
private:
Versionstamp snapshot_version_;
TxnErrorCode error_code_ = TxnErrorCode::TXN_OK;
bool has_find_ = false;
std::string current_key_and_version_;
std::string_view current_key_;
Versionstamp current_version_;
Message current_value_;
std::unique_ptr<FullRangeGetIterator> iter_;
};
template <typename Message>
requires IsProtobufMessage<Message>
bool DocumentMessageIterator<Message>::has_next() {
while (is_valid() && !has_find_ && iter_->has_next()) {
auto [key, value] = iter_->peek().value();
if (!current_key_and_version_.empty() && key.starts_with(current_key_and_version_)) {
// Skip keys that are part of the current document
iter_->next();
continue;
}
// Parse the key to extract the document key prefix and versionstamp
std::string_view document_key = key;
Versionstamp version;
// Decode the versionstamp from the end of the key
if (decode_tailing_versionstamp_end(&document_key) ||
decode_tailing_versionstamp(&document_key, &version)) {
LOG(ERROR) << "Failed to decode versionstamp from key: " << hex(key);
error_code_ = TxnErrorCode::TXN_INVALID_DATA;
return false; // Stop iteration on error
}
// Save the current key and version for filtering the split keys
current_key_and_version_ = document_key;
encode_versionstamp(version, &current_key_and_version_);
// Filter out documents newer than snapshot version or duplicate keys
if (snapshot_version_ <= version || document_key == current_key_) {
iter_->next(); // Skip this version
continue;
}
// Found a valid document, reconstruct it
Message msg;
TxnErrorCode code = document_get_versioned_children(iter_.get(), &msg);
if (code != TxnErrorCode::TXN_OK) {
LOG(ERROR) << "Failed to get versioned document for key: " << hex(document_key)
<< ", error code: " << code;
error_code_ = code;
return false;
}
// Successfully found and reconstructed a document
has_find_ = true;
current_key_ = document_key;
current_version_ = version;
current_value_ = std::move(msg);
return true;
}
return is_valid() && has_find_;
}
template <typename Message>
requires IsProtobufMessage<Message>
std::optional<typename DocumentMessageIterator<Message>::Element>
DocumentMessageIterator<Message>::next() {
if (!has_next()) {
return std::nullopt;
}
has_find_ = false; // Reset for next call
return std::make_tuple(current_key_, current_version_, std::ref(current_value_));
}
template <typename Message>
requires IsProtobufMessage<Message>
std::optional<typename DocumentMessageIterator<Message>::Element>
DocumentMessageIterator<Message>::peek() {
if (!has_next()) {
return std::nullopt;
}
return std::make_tuple(current_key_, current_version_, std::ref(current_value_));
}
// Read the specified document messages from the transaction by key range.
//
// The iterator will return the messages in the specified range in reversed order, and it will reconstruct
// the messages using the `split_schema` field of the message if it is split into multiple keys.
//
// The range is specified by the `begin` and `end` keys, which are expected to be the document keys.
// The `begin` key is inclusive, and the `end` key is exclusive by default, caller can change this
// behaviour by setting the range key selector in the options.
//
// ATTN: document keys are expected, rather than document key prefixes!
template <typename Message>
requires IsProtobufMessage<Message>
std::unique_ptr<DocumentMessageIterator<Message>> document_get_range(
Transaction* txn, std::string_view begin, std::string_view end,
const ReadDocumentMessagesOptions& options) {
std::string begin_key(begin);
std::string end_key(end);
if (options.exclude_begin_key) {
encode_versionstamp(Versionstamp::max(), &begin_key);
} else {
encode_versionstamp(Versionstamp::min(), &begin_key);
}
if (options.exclude_end_key) { // the end key is exclusive by default in fdb range get.
encode_versionstamp(Versionstamp::min(), &end_key);
} else {
encode_versionstamp(Versionstamp::max(), &end_key);
}
// Set up FullRangeGetOptions
FullRangeGetOptions range_options;
range_options.reverse = true; // Get latest versions first
range_options.prefetch = true;
range_options.begin_key_selector = RangeKeySelector::FIRST_GREATER_THAN;
range_options.end_key_selector = RangeKeySelector::FIRST_GREATER_THAN;
range_options.snapshot = options.snapshot;
range_options.batch_limit = options.batch_limit;
// Create the underlying range iterator
std::unique_ptr<FullRangeGetIterator> iter =
txn->full_range_get(begin_key, end_key, std::move(range_options));
// Return the document message iterator
return std::make_unique<DocumentMessageIterator<Message>>(std::move(iter),
options.snapshot_version);
}
} // namespace doris::cloud::versioned