blob: d08f497cfa37b0ecab15f0c5a59e869727ee6c45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/update/fast_append.h"
#include <iterator>
#include <vector>
#include "iceberg/constants.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_util_internal.h"
#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/transaction.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/macros.h"
namespace iceberg {
Result<std::unique_ptr<FastAppend>> FastAppend::Make(
std::string table_name, std::shared_ptr<TransactionContext> ctx) {
ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty");
ICEBERG_PRECHECK(ctx != nullptr, "Cannot create FastAppend without a context");
return std::unique_ptr<FastAppend>(
new FastAppend(std::move(table_name), std::move(ctx)));
}
FastAppend::FastAppend(std::string table_name, std::shared_ptr<TransactionContext> ctx)
: SnapshotUpdate(std::move(ctx)), table_name_(std::move(table_name)) {}
FastAppend& FastAppend::AppendFile(const std::shared_ptr<DataFile>& file) {
ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null");
ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(),
"Data file must have partition spec ID");
int32_t spec_id = file->partition_spec_id.value();
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, Spec(spec_id));
auto& data_files = new_data_files_by_spec_[spec_id];
auto [iter, inserted] = data_files.insert(file);
if (inserted) {
has_new_files_ = true;
ICEBERG_BUILDER_RETURN_IF_ERROR(summary_.AddedFile(*spec, *file));
}
return *this;
}
FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
ICEBERG_BUILDER_CHECK(!manifest.has_existing_files(),
"Cannot append manifest with existing files");
ICEBERG_BUILDER_CHECK(!manifest.has_deleted_files(),
"Cannot append manifest with deleted files");
ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId,
"Snapshot id must be assigned during commit");
ICEBERG_BUILDER_CHECK(manifest.sequence_number == kInvalidSequenceNumber,
"Sequence number must be assigned during commit");
if (can_inherit_snapshot_id() && manifest.added_snapshot_id == kInvalidSnapshotId) {
summary_.AddedManifest(manifest);
append_manifests_.push_back(manifest);
} else {
// The manifest must be rewritten with this update's snapshot ID
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest));
rewritten_append_manifests_.push_back(std::move(copied_manifest));
}
return *this;
}
std::string FastAppend::operation() { return DataOperation::kAppend; }
Result<std::vector<ManifestFile>> FastAppend::Apply(
const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) {
std::vector<ManifestFile> manifests;
ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests());
manifests.reserve(new_written_manifests.size() + append_manifests_.size() +
rewritten_append_manifests_.size());
if (!new_written_manifests.empty()) {
manifests.insert(manifests.end(),
std::make_move_iterator(new_written_manifests.begin()),
std::make_move_iterator(new_written_manifests.end()));
}
// Transform append manifests and rewritten append manifests with snapshot ID
int64_t snapshot_id = SnapshotId();
for (auto& manifest : append_manifests_) {
manifest.added_snapshot_id = snapshot_id;
}
for (auto& manifest : rewritten_append_manifests_) {
manifest.added_snapshot_id = snapshot_id;
}
manifests.insert(manifests.end(), append_manifests_.begin(), append_manifests_.end());
manifests.insert(manifests.end(), rewritten_append_manifests_.begin(),
rewritten_append_manifests_.end());
// Add all manifests from the snapshot
if (snapshot != nullptr) {
auto cached_snapshot = SnapshotCache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests,
cached_snapshot.Manifests(ctx_->table->io()));
manifests.insert(manifests.end(), snapshot_manifests.begin(),
snapshot_manifests.end());
}
return manifests;
}
std::unordered_map<std::string, std::string> FastAppend::Summary() {
summary_.SetPartitionSummaryLimit(
base().properties.Get(TableProperties::kWritePartitionSummaryLimit));
return summary_.Build();
}
void FastAppend::CleanUncommitted(const std::unordered_set<std::string>& committed) {
// Clean up new manifests that were written but not committed
if (!new_manifests_.empty()) {
for (const auto& manifest : new_manifests_) {
if (!committed.contains(manifest.manifest_path)) {
std::ignore = DeleteFile(manifest.manifest_path);
}
}
new_manifests_.clear();
}
// Clean up only rewritten append manifests as they are always owned by the table
// Don't clean up append manifests as they are added to the manifest list and are
// not compacted
if (!rewritten_append_manifests_.empty()) {
for (const auto& manifest : rewritten_append_manifests_) {
if (!committed.contains(manifest.manifest_path)) {
std::ignore = DeleteFile(manifest.manifest_path);
}
}
}
}
bool FastAppend::CleanupAfterCommit() const {
// Cleanup after committing is disabled for FastAppend unless there are
// rewritten_append_manifests_ because:
// 1.) Appended manifests are never rewritten
// 2.) Manifests which are written out as part of AppendFile are already cleaned
// up between commit attempts in WriteNewManifests
return !rewritten_append_manifests_.empty();
}
Result<std::shared_ptr<PartitionSpec>> FastAppend::Spec(int32_t spec_id) {
return base().PartitionSpecById(spec_id);
}
Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {
const TableMetadata& current = base();
ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema());
ICEBERG_ASSIGN_OR_RAISE(auto spec,
current.PartitionSpecById(manifest.partition_spec_id));
// Generate a unique manifest path using the transaction's metadata location
std::string new_manifest_path = ManifestPath();
int64_t snapshot_id = SnapshotId();
// Copy the manifest with the new snapshot ID.
return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, snapshot_id,
new_manifest_path, current.format_version, &summary_);
}
Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
// If there are new files and manifests were already written, clean them up
if (has_new_files_ && !new_manifests_.empty()) {
for (const auto& manifest : new_manifests_) {
std::ignore = DeleteFile(manifest.manifest_path);
}
new_manifests_.clear();
}
// Write new manifests if there are new data files
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests,
WriteDataManifests(data_files.as_span(), spec));
new_manifests_.insert(new_manifests_.end(),
std::make_move_iterator(written_manifests.begin()),
std::make_move_iterator(written_manifests.end()));
}
has_new_files_ = false;
}
return new_manifests_;
}
} // namespace iceberg