blob: 398272a53732896191463455a20253515ac654fc [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/core/utils/snapshot_manager.h"
#include <algorithm>
#include <chrono>
#include <random>
#include <thread>
#include <utility>
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/string_utils.h"
#include "paimon/core/snapshot.h"
#include "paimon/core/utils/branch_manager.h"
#include "paimon/core/utils/file_utils.h"
#include "paimon/fs/file_system.h"
#include "paimon/result.h"
namespace paimon {
SnapshotManager::SnapshotManager(const std::shared_ptr<FileSystem>& fs,
const std::string& root_path)
: SnapshotManager(fs, root_path, BranchManager::DEFAULT_MAIN_BRANCH) {}
SnapshotManager::SnapshotManager(const std::shared_ptr<FileSystem>& fs,
const std::string& root_path, const std::string& branch)
: fs_(fs), root_path_(root_path), branch_(BranchManager::NormalizeBranch(branch)) {}
SnapshotManager::~SnapshotManager() = default;
const std::shared_ptr<FileSystem>& SnapshotManager::Fs() const {
return fs_;
}
const std::string& SnapshotManager::RootPath() const {
return root_path_;
}
const std::string& SnapshotManager::Branch() const {
return branch_;
}
Result<std::optional<Snapshot>> SnapshotManager::LatestSnapshotOfUser(const std::string& user) {
PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> latest_id, LatestSnapshotId());
if (latest_id == std::nullopt) {
return std::optional<Snapshot>();
}
PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> earliest_id, EarliestSnapshotId());
if (earliest_id == std::nullopt) {
return Status::Invalid(
"Latest snapshot id is not null, but earliest snapshot id is null. This is "
"unexpected.");
}
for (int64_t id = latest_id.value(); id >= earliest_id.value(); id--) {
PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(id));
if (snapshot.CommitUser() == user) {
return std::optional<Snapshot>(snapshot);
}
}
return std::optional<Snapshot>();
}
Result<Snapshot> SnapshotManager::LoadSnapshot(int64_t snapshot_id) const {
return Snapshot::FromPath(fs_, SnapshotPath(snapshot_id));
}
Result<std::optional<Snapshot>> SnapshotManager::LatestSnapshot() const {
PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> snapshot_id, LatestSnapshotId());
if (snapshot_id == std::nullopt) {
return std::optional<Snapshot>();
} else {
PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(snapshot_id.value()));
return std::optional<Snapshot>(snapshot);
}
}
Result<std::optional<int64_t>> SnapshotManager::LatestSnapshotId() const {
return FindLatest(
SnapshotDirectory(), std::string(SNAPSHOT_PREFIX),
[this](int64_t snapshot_id) -> std::string { return SnapshotPath(snapshot_id); });
}
Result<std::optional<int64_t>> SnapshotManager::EarliestSnapshotId() const {
return FindEarliest(
SnapshotDirectory(), std::string(SNAPSHOT_PREFIX),
[this](int64_t snapshot_id) -> std::string { return SnapshotPath(snapshot_id); });
}
std::string SnapshotManager::BranchPath() const {
return BranchManager::BranchPath(root_path_, branch_);
}
std::string SnapshotManager::SnapshotDirectory() const {
return PathUtil::JoinPath(BranchPath(), "/snapshot");
}
Result<bool> SnapshotManager::SnapshotExists(int64_t snapshot_id) const {
return fs_->Exists(SnapshotPath(snapshot_id));
}
std::string SnapshotManager::SnapshotPath(int64_t snapshot_id) const {
return PathUtil::JoinPath(
BranchPath(), "/snapshot/" + std::string(SNAPSHOT_PREFIX) + std::to_string(snapshot_id));
}
Result<std::optional<int64_t>> SnapshotManager::FindEarliest(
const std::string& dir, const std::string& prefix,
const std::function<std::string(int64_t)>& path_func) const {
PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(dir));
if (!is_exist) {
return std::optional<int64_t>();
}
std::optional<int64_t> snapshot_id = ReadHint(EARLIEST, dir);
if (snapshot_id != std::nullopt) {
std::string path = path_func(snapshot_id.value());
PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(path));
if (is_exist) {
return snapshot_id;
}
}
return FindByListFiles([](int64_t lhs, int64_t rhs) -> int64_t { return std::min(lhs, rhs); },
dir, prefix);
}
Result<std::optional<int64_t>> SnapshotManager::FindLatest(
const std::string& dir, const std::string& prefix,
const std::function<std::string(int64_t)>& path_func) const {
PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(dir));
if (!is_exist) {
return std::optional<int64_t>();
}
std::optional<int64_t> snapshot_id = ReadHint(LATEST, dir);
if (snapshot_id != std::nullopt && snapshot_id.value() > 0) {
int64_t next_snapshot = snapshot_id.value() + 1;
// it is the latest only there is no next one
std::string path = path_func(next_snapshot);
PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(path));
if (!is_exist) {
return snapshot_id;
}
}
return FindByListFiles([](int64_t lhs, int64_t rhs) -> int64_t { return std::max(lhs, rhs); },
dir, prefix);
}
Result<std::optional<int64_t>> SnapshotManager::FindByListFiles(
const std::function<int64_t(int64_t, int64_t)> reducer_func, const std::string& dir,
const std::string& prefix) const {
std::vector<int64_t> versions;
PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFiles(fs_, dir, prefix, &versions));
if (versions.empty()) {
return std::optional<int64_t>();
}
int64_t ret = versions[0];
for (const auto& version : versions) {
ret = reducer_func(ret, version);
}
return std::optional<int64_t>(ret);
}
Result<std::set<std::string>> SnapshotManager::TryGetNonSnapshotFiles(int64_t older_than_ms) const {
std::set<std::string> non_snapshot_files;
std::vector<std::unique_ptr<FileStatus>> file_status_list;
PAIMON_RETURN_NOT_OK(fs_->ListFileStatus(SnapshotDirectory(), &file_status_list));
for (const auto& file_status : file_status_list) {
std::string file_name = PathUtil::GetName(file_status->GetPath());
if (!StringUtils::StartsWith(file_name, std::string(SNAPSHOT_PREFIX)) &&
file_name != std::string(EARLIEST) && file_name != std::string(LATEST)) {
if (file_status->GetModificationTime() < older_than_ms) {
non_snapshot_files.insert(file_status->GetPath());
}
}
}
return non_snapshot_files;
}
// Noted that: try best to read hint to avoid unnecessary list dir operation, if failed, do not
// return error
std::optional<int64_t> SnapshotManager::ReadHint(const std::string& file_name,
const std::string& dir) const {
std::string path = PathUtil::JoinPath(dir, file_name);
int32_t retry_number = 0;
while (retry_number++ < READ_HINT_RETRY_NUM) {
std::string content;
Status s = fs_->ReadFile(path, &content);
if (s.ok()) {
return StringUtils::StringToValue<int64_t>(content);
}
std::this_thread::sleep_for(std::chrono::milliseconds(READ_HINT_RETRY_INTERVAL));
}
return std::nullopt;
}
Status SnapshotManager::CommitLatestHint(int64_t snapshot_id) {
return CommitHint(snapshot_id, LATEST, SnapshotDirectory());
}
Status SnapshotManager::CommitEarliestHint(int64_t snapshot_id) {
return CommitHint(snapshot_id, EARLIEST, SnapshotDirectory());
}
Status SnapshotManager::CommitHint(int64_t snapshot_id, const std::string& file_name,
const std::string& dir) {
std::string path = PathUtil::JoinPath(dir, file_name);
std::string snapshot_id_str = std::to_string(snapshot_id);
int32_t loop_time = 3;
Status s;
while (loop_time-- > 0) {
s = fs_->WriteFile(path, snapshot_id_str, /*overwrite=*/true);
if (s.ok()) {
return s;
} else {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int> dist(0, 999);
int64_t sleep_time = dist(gen) + 500;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time));
}
}
return s;
}
Result<std::vector<Snapshot>> SnapshotManager::GetAllSnapshots() const {
std::vector<Snapshot> snapshots;
std::vector<std::unique_ptr<BasicFileStatus>> file_statuses;
PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFileStatus(fs_, SnapshotDirectory(),
SNAPSHOT_PREFIX, &file_statuses));
for (const auto& file_status : file_statuses) {
auto snapshot_path = file_status->GetPath();
PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, Snapshot::FromPath(fs_, snapshot_path));
snapshots.push_back(snapshot);
}
return snapshots;
}
Result<std::optional<Snapshot>> SnapshotManager::EarlierOrEqualTimeMillis(
int64_t timestamp_millis) const {
return FindSnapshotBeforeTimestamp(timestamp_millis, std::less_equal<int64_t>{});
}
Result<std::optional<Snapshot>> SnapshotManager::EarlierThanTimeMillis(
int64_t timestamp_millis) const {
return FindSnapshotBeforeTimestamp(timestamp_millis, std::less<int64_t>{});
}
Result<std::optional<Snapshot>> SnapshotManager::FindSnapshotBeforeTimestamp(
int64_t timestamp_millis, const std::function<bool(int64_t, int64_t)>& compare) const {
PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> latest_id, LatestSnapshotId());
if (latest_id == std::nullopt) {
return std::optional<Snapshot>();
}
PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> earliest_id, EarliestSnapshotId());
if (earliest_id == std::nullopt) {
return std::optional<Snapshot>();
}
PAIMON_ASSIGN_OR_RAISE(Snapshot earliest_snapshot, LoadSnapshot(earliest_id.value()));
if (!compare(earliest_snapshot.TimeMillis(), timestamp_millis)) {
return std::optional<Snapshot>();
}
int64_t lo = earliest_id.value();
int64_t hi = latest_id.value();
std::optional<Snapshot> result;
while (lo <= hi) {
int64_t mid = lo + (hi - lo) / 2;
PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(mid));
if (compare(snapshot.TimeMillis(), timestamp_millis)) {
lo = mid + 1;
result = std::move(snapshot);
} else {
hi = mid - 1;
}
}
return result;
}
} // namespace paimon