blob: 4aed78db8b62dc472836d51b0aa5aafedab1e034 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "fluss.hpp"
#include "lib.rs.h"
#include "ffi_converter.hpp"
#include "rust/cxx.h"
namespace fluss {
Admin::Admin() noexcept = default;
Admin::Admin(ffi::Admin* admin) noexcept : admin_(admin) {}
Admin::~Admin() noexcept { Destroy(); }
void Admin::Destroy() noexcept {
if (admin_) {
ffi::delete_admin(admin_);
admin_ = nullptr;
}
}
Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) {
other.admin_ = nullptr;
}
Admin& Admin::operator=(Admin&& other) noexcept {
if (this != &other) {
Destroy();
admin_ = other.admin_;
other.admin_ = nullptr;
}
return *this;
}
bool Admin::Available() const { return admin_ != nullptr; }
Result Admin::CreateTable(const TablePath& table_path,
const TableDescriptor& descriptor,
bool ignore_if_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_desc = utils::to_ffi_table_descriptor(descriptor);
auto ffi_result = admin_->create_table(ffi_path, ffi_desc, ignore_if_exists);
return utils::from_ffi_result(ffi_result);
}
Result Admin::DropTable(const TablePath& table_path, bool ignore_if_not_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_result = admin_->drop_table(ffi_path, ignore_if_not_exists);
return utils::from_ffi_result(ffi_result);
}
Result Admin::GetTable(const TablePath& table_path, TableInfo& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_result = admin_->get_table_info(ffi_path);
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out = utils::from_ffi_table_info(ffi_result.table_info);
}
return result;
}
Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_result = admin_->get_latest_lake_snapshot(ffi_path);
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out = utils::from_ffi_lake_snapshot(ffi_result.lake_snapshot);
}
return result;
}
// function for common list offsets functionality
Result Admin::DoListOffsets(const TablePath& table_path,
const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out,
const std::string* partition_name) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
rust::Vec<int32_t> rust_bucket_ids;
for (int32_t id : bucket_ids) {
rust_bucket_ids.push_back(id);
}
ffi::FfiOffsetQuery ffi_query;
ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
ffi_query.timestamp = offset_query.timestamp;
ffi::FfiListOffsetsResult ffi_result;
if (partition_name != nullptr) {
ffi_result = admin_->list_partition_offsets(ffi_path, rust::String(*partition_name), std::move(rust_bucket_ids), ffi_query);
} else {
ffi_result = admin_->list_offsets(ffi_path, std::move(rust_bucket_ids), ffi_query);
}
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
for (const auto& pair : ffi_result.bucket_offsets) {
out[pair.bucket_id] = pair.offset;
}
}
return result;
}
Result Admin::ListOffsets(const TablePath& table_path,
const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out) {
return DoListOffsets(table_path, bucket_ids, offset_query, out);
}
Result Admin::ListPartitionOffsets(const TablePath& table_path,
const std::string& partition_name,
const std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out) {
return DoListOffsets(table_path, bucket_ids, offset_query, out, &partition_name);
}
Result Admin::ListPartitionInfos(const TablePath& table_path,
std::vector<PartitionInfo>& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_result = admin_->list_partition_infos(ffi_path);
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
out.reserve(ffi_result.partition_infos.size());
for (const auto& pi : ffi_result.partition_infos) {
out.push_back({pi.partition_id, std::string(pi.partition_name)});
}
}
return result;
}
Result Admin::CreatePartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
rust::Vec<ffi::FfiPartitionKeyValue> rust_spec;
for (const auto& [key, value] : partition_spec) {
ffi::FfiPartitionKeyValue kv;
kv.key = rust::String(key);
kv.value = rust::String(value);
rust_spec.push_back(std::move(kv));
}
auto ffi_result = admin_->create_partition(ffi_path, std::move(rust_spec), ignore_if_exists);
return utils::from_ffi_result(ffi_result);
}
} // namespace fluss