blob: a1f1749d0f165a8d7e4e9a0d23a51ee6d4e4c664 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/core/snapshot.h"
#include <cassert>
#include <stdexcept>
#include <utility>
#include "paimon/common/utils/rapidjson_util.h"
#include "paimon/fs/file_system.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "rapidjson/allocators.h"
#include "rapidjson/document.h"
#include "rapidjson/rapidjson.h"
namespace paimon {
const Snapshot::CommitKind Snapshot::CommitKind::Append() {
static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(0);
return commit_kind;
}
const Snapshot::CommitKind Snapshot::CommitKind::Compact() {
static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(1);
return commit_kind;
}
const Snapshot::CommitKind Snapshot::CommitKind::Overwrite() {
static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(2);
return commit_kind;
}
const Snapshot::CommitKind Snapshot::CommitKind::Analyze() {
static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(3);
return commit_kind;
}
const Snapshot::CommitKind Snapshot::CommitKind::Unknown() {
static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(-1);
return commit_kind;
}
bool Snapshot::TEST_Equal(const Snapshot& other) const {
if (this == &other) {
return true;
}
if ((base_manifest_list_size_ && !other.base_manifest_list_size_) ||
(!base_manifest_list_size_ && other.base_manifest_list_size_)) {
return false;
}
if ((delta_manifest_list_size_ && !other.delta_manifest_list_size_) ||
(!delta_manifest_list_size_ && other.delta_manifest_list_size_)) {
return false;
}
if ((changelog_manifest_list_ && !other.changelog_manifest_list_) ||
(!changelog_manifest_list_ && other.changelog_manifest_list_)) {
return false;
}
if ((changelog_manifest_list_size_ && !other.changelog_manifest_list_size_) ||
(!changelog_manifest_list_size_ && other.changelog_manifest_list_size_)) {
return false;
}
return version_ == other.version_ && id_ == other.id_ && schema_id_ == other.schema_id_ &&
index_manifest_ == other.index_manifest_ && commit_user_ == other.commit_user_ &&
commit_identifier_ == other.commit_identifier_ && commit_kind_ == other.commit_kind_ &&
log_offsets_ == other.log_offsets_ && total_record_count_ == other.total_record_count_ &&
delta_record_count_ == other.delta_record_count_ &&
changelog_record_count_ == other.changelog_record_count_ &&
watermark_ == other.watermark_ && statistics_ == other.statistics_ &&
properties_ == other.properties_ && next_row_id_ == other.next_row_id_;
}
bool Snapshot::operator==(const Snapshot& other) const {
if (this == &other) {
return true;
}
return version_ == other.version_ && id_ == other.id_ && schema_id_ == other.schema_id_ &&
base_manifest_list_ == other.base_manifest_list_ &&
base_manifest_list_size_ == other.base_manifest_list_size_ &&
delta_manifest_list_ == other.delta_manifest_list_ &&
delta_manifest_list_size_ == other.delta_manifest_list_size_ &&
changelog_manifest_list_ == other.changelog_manifest_list_ &&
changelog_manifest_list_size_ == other.changelog_manifest_list_size_ &&
index_manifest_ == other.index_manifest_ && commit_user_ == other.commit_user_ &&
commit_identifier_ == other.commit_identifier_ && commit_kind_ == other.commit_kind_ &&
time_millis_ == other.time_millis_ && log_offsets_ == other.log_offsets_ &&
total_record_count_ == other.total_record_count_ &&
delta_record_count_ == other.delta_record_count_ &&
changelog_record_count_ == other.changelog_record_count_ &&
watermark_ == other.watermark_ && statistics_ == other.statistics_ &&
properties_ == other.properties_ && next_row_id_ == other.next_row_id_;
}
std::string Snapshot::CommitKind::ToString(const Snapshot::CommitKind& kind) {
switch (kind.value_) {
case 0:
return "APPEND";
case 1:
return "COMPACT";
case 2:
return "OVERWRITE";
case 3:
return "ANALYZE";
default:
assert(false);
return "UNKNOWN";
}
}
Snapshot::CommitKind Snapshot::CommitKind::FromString(const std::string& kind) {
if (kind == "APPEND") {
return Append();
} else if (kind == "COMPACT") {
return Compact();
} else if (kind == "OVERWRITE") {
return Overwrite();
} else if (kind == "ANALYZE") {
return Analyze();
}
assert(false);
return Unknown();
}
Snapshot::Snapshot(const std::optional<int32_t>& version, int64_t id, int64_t schema_id,
const std::string& base_manifest_list,
const std::optional<int64_t>& base_manifest_list_size,
const std::string& delta_manifest_list,
const std::optional<int64_t>& delta_manifest_list_size,
const std::optional<std::string>& changelog_manifest_list,
const std::optional<int64_t>& changelog_manifest_list_size,
const std::optional<std::string>& index_manifest, const std::string& commit_user,
int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis,
const std::optional<std::map<int32_t, int64_t>>& log_offsets,
const std::optional<int64_t>& total_record_count,
const std::optional<int64_t>& delta_record_count,
const std::optional<int64_t>& changelog_record_count,
const std::optional<int64_t>& watermark,
const std::optional<std::string>& statistics,
const std::optional<std::map<std::string, std::string>>& properties,
const std::optional<int64_t>& next_row_id)
: version_(version),
id_(id),
schema_id_(schema_id),
base_manifest_list_(base_manifest_list),
base_manifest_list_size_(base_manifest_list_size),
delta_manifest_list_(delta_manifest_list),
delta_manifest_list_size_(delta_manifest_list_size),
changelog_manifest_list_(changelog_manifest_list),
changelog_manifest_list_size_(changelog_manifest_list_size),
index_manifest_(index_manifest),
commit_user_(commit_user),
commit_identifier_(commit_identifier),
commit_kind_(commit_kind),
time_millis_(time_millis),
log_offsets_(log_offsets),
total_record_count_(total_record_count),
delta_record_count_(delta_record_count),
changelog_record_count_(changelog_record_count),
watermark_(watermark),
statistics_(statistics),
properties_(properties),
next_row_id_(next_row_id) {}
rapidjson::Value Snapshot::ToJson(rapidjson::Document::AllocatorType* allocator) const
noexcept(false) {
rapidjson::Value obj(rapidjson::kObjectType);
obj.AddMember(rapidjson::StringRef(FIELD_VERSION),
RapidJsonUtil::SerializeValue(Version(), allocator).Move(), *allocator);
obj.AddMember(rapidjson::StringRef(FIELD_ID),
RapidJsonUtil::SerializeValue(id_, allocator).Move(), *allocator);
obj.AddMember(rapidjson::StringRef(FIELD_SCHEMA_ID),
RapidJsonUtil::SerializeValue(schema_id_, allocator).Move(), *allocator);
obj.AddMember(rapidjson::StringRef(FIELD_BASE_MANIFEST_LIST),
RapidJsonUtil::SerializeValue(base_manifest_list_, allocator).Move(), *allocator);
if (base_manifest_list_size_) {
obj.AddMember(
rapidjson::StringRef(FIELD_BASE_MANIFEST_LIST_SIZE),
RapidJsonUtil::SerializeValue(base_manifest_list_size_.value(), allocator).Move(),
*allocator);
}
obj.AddMember(rapidjson::StringRef(FIELD_DELTA_MANIFEST_LIST),
RapidJsonUtil::SerializeValue(delta_manifest_list_, allocator).Move(),
*allocator);
if (delta_manifest_list_size_) {
obj.AddMember(rapidjson::StringRef(FIELD_DELTA_MANIFEST_LIST_SIZE),
RapidJsonUtil::SerializeValue(delta_manifest_list_size_, allocator).Move(),
*allocator);
}
obj.AddMember(rapidjson::StringRef(FIELD_CHANGELOG_MANIFEST_LIST),
RapidJsonUtil::SerializeValue(changelog_manifest_list_, allocator).Move(),
*allocator);
if (changelog_manifest_list_size_) {
obj.AddMember(
rapidjson::StringRef(FIELD_CHANGELOG_MANIFEST_LIST_SIZE),
RapidJsonUtil::SerializeValue(changelog_manifest_list_size_, allocator).Move(),
*allocator);
}
if (index_manifest_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_INDEX_MANIFEST),
RapidJsonUtil::SerializeValue(index_manifest_.value(), allocator).Move(),
*allocator);
}
obj.AddMember(rapidjson::StringRef(FIELD_COMMIT_USER),
RapidJsonUtil::SerializeValue(commit_user_, allocator).Move(), *allocator);
obj.AddMember(rapidjson::StringRef(FIELD_COMMIT_IDENTIFIER),
RapidJsonUtil::SerializeValue(commit_identifier_, allocator).Move(), *allocator);
obj.AddMember(
rapidjson::StringRef(FIELD_COMMIT_KIND),
RapidJsonUtil::SerializeValue(Snapshot::CommitKind::ToString(commit_kind_), allocator)
.Move(),
*allocator);
obj.AddMember(rapidjson::StringRef(FIELD_TIME_MILLIS),
RapidJsonUtil::SerializeValue(time_millis_, allocator).Move(), *allocator);
if (log_offsets_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_LOG_OFFSETS),
RapidJsonUtil::SerializeValue(log_offsets_.value(), allocator).Move(),
*allocator);
}
obj.AddMember(rapidjson::StringRef(FIELD_TOTAL_RECORD_COUNT),
RapidJsonUtil::SerializeValue(total_record_count_.value(), allocator).Move(),
*allocator);
obj.AddMember(rapidjson::StringRef(FIELD_DELTA_RECORD_COUNT),
RapidJsonUtil::SerializeValue(delta_record_count_.value(), allocator).Move(),
*allocator);
if (changelog_record_count_ != std::nullopt) {
obj.AddMember(
rapidjson::StringRef(FIELD_CHANGELOG_RECORD_COUNT),
RapidJsonUtil::SerializeValue(changelog_record_count_.value(), allocator).Move(),
*allocator);
}
if (watermark_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_WATERMARK),
RapidJsonUtil::SerializeValue(watermark_.value(), allocator).Move(),
*allocator);
}
if (statistics_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_STATISTICS),
RapidJsonUtil::SerializeValue(statistics_.value(), allocator).Move(),
*allocator);
}
if (properties_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_PROPERTIES),
RapidJsonUtil::SerializeValue(properties_.value(), allocator).Move(),
*allocator);
}
if (next_row_id_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_NEXT_ROW_ID),
RapidJsonUtil::SerializeValue(next_row_id_.value(), allocator).Move(),
*allocator);
}
return obj;
}
void Snapshot::FromJson(const rapidjson::Value& obj) noexcept(false) {
version_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj, FIELD_VERSION, -1);
id_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, FIELD_ID);
schema_id_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, FIELD_SCHEMA_ID);
base_manifest_list_ =
RapidJsonUtil::DeserializeKeyValue<std::string>(obj, FIELD_BASE_MANIFEST_LIST);
base_manifest_list_size_ = RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(
obj, FIELD_BASE_MANIFEST_LIST_SIZE);
delta_manifest_list_ =
RapidJsonUtil::DeserializeKeyValue<std::string>(obj, FIELD_DELTA_MANIFEST_LIST);
delta_manifest_list_size_ = RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(
obj, FIELD_DELTA_MANIFEST_LIST_SIZE);
changelog_manifest_list_ = RapidJsonUtil::DeserializeKeyValue<std::optional<std::string>>(
obj, FIELD_CHANGELOG_MANIFEST_LIST);
changelog_manifest_list_size_ = RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(
obj, FIELD_CHANGELOG_MANIFEST_LIST_SIZE);
index_manifest_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<std::string>>(obj, FIELD_INDEX_MANIFEST);
commit_user_ = RapidJsonUtil::DeserializeKeyValue<std::string>(obj, FIELD_COMMIT_USER);
commit_identifier_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, FIELD_COMMIT_IDENTIFIER);
commit_kind_ = Snapshot::CommitKind::FromString(
RapidJsonUtil::DeserializeKeyValue<std::string>(obj, FIELD_COMMIT_KIND));
if (commit_kind_ == Snapshot::CommitKind::Unknown()) {
throw std::invalid_argument("deserialize CommitKind failed");
}
time_millis_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, FIELD_TIME_MILLIS);
log_offsets_ = RapidJsonUtil::DeserializeKeyValue<std::optional<std::map<int32_t, int64_t>>>(
obj, FIELD_LOG_OFFSETS);
total_record_count_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(obj, FIELD_TOTAL_RECORD_COUNT);
delta_record_count_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(obj, FIELD_DELTA_RECORD_COUNT);
changelog_record_count_ = RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(
obj, FIELD_CHANGELOG_RECORD_COUNT);
watermark_ = RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(obj, FIELD_WATERMARK);
statistics_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<std::string>>(obj, FIELD_STATISTICS);
properties_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<std::map<std::string, std::string>>>(
obj, FIELD_PROPERTIES);
next_row_id_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(obj, FIELD_NEXT_ROW_ID);
}
Result<Snapshot> Snapshot::FromPath(const std::shared_ptr<FileSystem>& fs,
const std::string& path) {
std::string json_str;
PAIMON_RETURN_NOT_OK(fs->ReadFile(path, &json_str));
PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, Snapshot::FromJsonString(json_str));
return snapshot;
}
} // namespace paimon