blob: 1f35781fafbbc98824c81b561e5baae5832acfd2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/update/update_schema.h"
#include <format>
#include <memory>
#include <optional>
#include <ranges>
#include <string>
#include <string_view>
#include <unordered_set>
#include <utility>
#include <vector>
#include "iceberg/json_serde_internal.h"
#include "iceberg/name_mapping.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/transaction.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
#include "iceberg/util/type_util.h"
#include "iceberg/util/visit_type.h"
namespace iceberg {
namespace {
constexpr int32_t kTableRootId = -1;
/// \brief Visitor for applying schema changes recursively to nested types
class ApplyChangesVisitor {
public:
ApplyChangesVisitor(
const std::unordered_set<int32_t>& deletes,
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
const std::unordered_map<int32_t, std::vector<int32_t>>& parent_to_added_ids,
const std::unordered_map<int32_t, std::vector<UpdateSchema::Move>>& moves)
: deletes_(deletes),
updates_(updates),
parent_to_added_ids_(parent_to_added_ids),
moves_(moves) {}
/// \brief Apply changes to a type using schema visitor pattern
Result<std::shared_ptr<Type>> ApplyChanges(const std::shared_ptr<Type>& type,
int32_t parent_id) {
return VisitTypeCategory(*type, this, type, parent_id);
}
/// \brief Apply changes to a struct type
Result<std::shared_ptr<Type>> VisitStruct(const StructType& struct_type,
const std::shared_ptr<Type>& base_type,
int32_t parent_id) {
std::vector<SchemaField> new_fields;
bool has_changes = false;
for (const auto& field : struct_type.fields()) {
ICEBERG_ASSIGN_OR_RAISE(auto field_type_result,
ApplyChanges(field.type(), field.field_id()));
ICEBERG_ASSIGN_OR_RAISE(auto processed_field,
ProcessField(field, field_type_result));
if (processed_field.has_value()) {
const auto& new_field = processed_field.value();
new_fields.push_back(new_field);
if (new_field != field) {
has_changes = true;
}
} else {
has_changes = true;
}
}
auto adds_it = parent_to_added_ids_.find(parent_id);
if (adds_it != parent_to_added_ids_.end() && !adds_it->second.empty()) {
has_changes = true;
for (int32_t added_id : adds_it->second) {
auto added_field_it = updates_.find(added_id);
if (added_field_it != updates_.end()) {
new_fields.push_back(*added_field_it->second);
}
}
}
auto moves_it = moves_.find(parent_id);
if (moves_it != moves_.end() && !moves_it->second.empty()) {
has_changes = true;
new_fields = MoveFields(std::move(new_fields), moves_it->second);
}
if (!has_changes) {
return base_type;
}
return std::make_shared<StructType>(std::move(new_fields));
}
/// \brief Apply changes to a list type
Result<std::shared_ptr<Type>> VisitList(const ListType& list_type,
const std::shared_ptr<Type>& base_type,
int32_t parent_id) {
const auto& element = list_type.element();
ICEBERG_ASSIGN_OR_RAISE(auto element_type_result,
ApplyChanges(element.type(), element.field_id()));
ICEBERG_ASSIGN_OR_RAISE(auto processed_element,
ProcessField(element, element_type_result));
ICEBERG_CHECK(processed_element.has_value(),
"Cannot delete element field from list: {}", list_type.ToString());
const auto& new_element = processed_element.value();
if (element == new_element) {
return base_type;
}
return std::make_shared<ListType>(new_element);
}
/// \brief Apply changes to a map type
Result<std::shared_ptr<Type>> VisitMap(const MapType& map_type,
const std::shared_ptr<Type>& base_type,
int32_t parent_id) {
const auto& key = map_type.key();
const auto& value = map_type.value();
int32_t key_id = key.field_id();
ICEBERG_CHECK(!deletes_.contains(key_id), "Cannot delete map keys");
ICEBERG_CHECK(!updates_.contains(key_id), "Cannot update map keys");
ICEBERG_CHECK(!parent_to_added_ids_.contains(key_id),
"Cannot add fields to map keys");
ICEBERG_ASSIGN_OR_RAISE(auto key_type_result, ApplyChanges(key.type(), key_id));
ICEBERG_ASSIGN_OR_RAISE(auto value_type_result,
ApplyChanges(value.type(), value.field_id()));
ICEBERG_CHECK(*key_type_result == *key.type(), "Cannot alter map keys");
ICEBERG_ASSIGN_OR_RAISE(auto processed_value, ProcessField(value, value_type_result));
ICEBERG_CHECK(processed_value.has_value(), "Cannot delete value field from map: {}",
map_type.ToString());
const auto& new_value = processed_value.value();
if (key == map_type.key() && value == new_value) {
return base_type;
}
return std::make_shared<MapType>(key, new_value);
}
Result<std::shared_ptr<Type>> VisitPrimitive(const PrimitiveType& primitive_type,
const std::shared_ptr<Type>& base_type,
int32_t parent_id) {
return base_type;
}
private:
Result<std::optional<SchemaField>> ProcessField(
const SchemaField& field, const std::shared_ptr<Type>& field_type_result) {
int32_t field_id = field.field_id();
if (deletes_.contains(field_id)) {
return std::nullopt;
}
std::shared_ptr<Type> result_type = field_type_result;
// Note: We check the update against the ORIGINAL field type, not the recursively
// processed type, because we want to preserve nested changes from recursion
auto update_it = updates_.find(field_id);
if (update_it != updates_.end()) {
const auto& update_field = update_it->second;
if (update_field->type() != field.type()) {
result_type = update_field->type();
}
}
// Note: Child field additions are handled in VisitStruct, not here.
// The recursively processed type (field_type_result) already contains
// any child fields that were added.
if (update_it != updates_.end()) {
const auto& update_field = update_it->second;
return SchemaField(field_id, update_field->name(), std::move(result_type),
update_field->optional(), update_field->doc());
} else if (result_type != field.type()) {
return SchemaField(field_id, field.name(), std::move(result_type), field.optional(),
field.doc());
} else {
return field;
}
}
/// \brief Helper function to apply move operations to a list of fields
static std::vector<SchemaField> MoveFields(
std::vector<SchemaField>&& fields, const std::vector<UpdateSchema::Move>& moves);
const std::unordered_set<int32_t>& deletes_;
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates_;
const std::unordered_map<int32_t, std::vector<int32_t>>& parent_to_added_ids_;
const std::unordered_map<int32_t, std::vector<UpdateSchema::Move>>& moves_;
};
std::vector<SchemaField> ApplyChangesVisitor::MoveFields(
std::vector<SchemaField>&& fields, const std::vector<UpdateSchema::Move>& moves) {
std::vector<SchemaField> reordered = std::move(fields);
for (const auto& move : moves) {
auto it = std::ranges::find_if(reordered, [&move](const SchemaField& field) {
return field.field_id() == move.field_id;
});
if (it == reordered.end()) {
continue;
}
SchemaField to_move = *it;
reordered.erase(it);
switch (move.type) {
case UpdateSchema::Move::MoveType::kFirst:
reordered.insert(reordered.begin(), std::move(to_move));
break;
case UpdateSchema::Move::MoveType::kBefore: {
auto before_it =
std::ranges::find_if(reordered, [&move](const SchemaField& field) {
return field.field_id() == move.reference_field_id;
});
if (before_it != reordered.end()) {
reordered.insert(before_it, std::move(to_move));
}
break;
}
case UpdateSchema::Move::MoveType::kAfter: {
auto after_it =
std::ranges::find_if(reordered, [&move](const SchemaField& field) {
return field.field_id() == move.reference_field_id;
});
if (after_it != reordered.end()) {
reordered.insert(after_it + 1, std::move(to_move));
}
break;
}
}
}
return reordered;
}
} // namespace
Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
std::shared_ptr<TransactionContext> ctx) {
ICEBERG_PRECHECK(ctx != nullptr, "Cannot create UpdateSchema without context");
return std::shared_ptr<UpdateSchema>(new UpdateSchema(std::move(ctx)));
}
UpdateSchema::UpdateSchema(std::shared_ptr<TransactionContext> ctx)
: PendingUpdate(std::move(ctx)) {
const TableMetadata& base_metadata = ctx_->current();
auto schema_result = base_metadata.Schema();
if (!schema_result.has_value()) {
AddError(schema_result.error());
return;
}
schema_ = std::move(schema_result.value());
last_column_id_ = base_metadata.last_column_id;
auto identifier_names_result = schema_->IdentifierFieldNames();
if (!identifier_names_result.has_value()) {
AddError(identifier_names_result.error());
return;
}
identifier_field_names_ = std::move(identifier_names_result.value());
id_to_parent_ = IndexParents(*schema_);
}
UpdateSchema::~UpdateSchema() = default;
UpdateSchema::Move UpdateSchema::Move::First(int32_t field_id) {
return Move{
.field_id = field_id, .reference_field_id = kTableRootId, .type = MoveType::kFirst};
}
UpdateSchema::Move UpdateSchema::Move::Before(int32_t field_id,
int32_t reference_field_id) {
return Move{.field_id = field_id,
.reference_field_id = reference_field_id,
.type = MoveType::kBefore};
}
UpdateSchema::Move UpdateSchema::Move::After(int32_t field_id,
int32_t reference_field_id) {
return Move{.field_id = field_id,
.reference_field_id = reference_field_id,
.type = MoveType::kAfter};
}
UpdateSchema& UpdateSchema::AllowIncompatibleChanges() {
allow_incompatible_changes_ = true;
return *this;
}
UpdateSchema& UpdateSchema::CaseSensitive(bool case_sensitive) {
case_sensitive_ = case_sensitive;
return *this;
}
UpdateSchema& UpdateSchema::AddColumn(std::string_view name, std::shared_ptr<Type> type,
std::string_view doc) {
ICEBERG_BUILDER_CHECK(!name.contains('.'),
"Cannot add column with ambiguous name: {}, use "
"AddColumn(parent, name, type, doc)",
name);
return AddColumnInternal(std::nullopt, name, /*is_optional=*/true, std::move(type),
doc);
}
UpdateSchema& UpdateSchema::AddColumn(std::optional<std::string_view> parent,
std::string_view name, std::shared_ptr<Type> type,
std::string_view doc) {
return AddColumnInternal(std::move(parent), name, /*is_optional=*/true, std::move(type),
doc);
}
UpdateSchema& UpdateSchema::AddRequiredColumn(std::string_view name,
std::shared_ptr<Type> type,
std::string_view doc) {
ICEBERG_BUILDER_CHECK(!name.contains('.'),
"Cannot add column with ambiguous name: {}, use "
"AddRequiredColumn(parent, name, type, doc)",
name);
return AddColumnInternal(std::nullopt, name, /*is_optional=*/false, std::move(type),
doc);
}
UpdateSchema& UpdateSchema::AddRequiredColumn(std::optional<std::string_view> parent,
std::string_view name,
std::shared_ptr<Type> type,
std::string_view doc) {
return AddColumnInternal(std::move(parent), name, /*is_optional=*/false,
std::move(type), doc);
}
UpdateSchema& UpdateSchema::UpdateColumn(std::string_view name,
std::shared_ptr<PrimitiveType> new_type) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindFieldForUpdate(name));
ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot update missing column: {}", name);
const auto& field = field_opt->get();
int32_t field_id = field.field_id();
ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
"Cannot update a column that will be deleted: {}", field.name());
if (*field.type() == *new_type) {
return *this;
}
ICEBERG_BUILDER_CHECK(IsPromotionAllowed(field.type(), new_type),
"Cannot change column type: {}: {} -> {}", name,
field.type()->ToString(), new_type->ToString());
updates_[field_id] = std::make_shared<SchemaField>(
field.field_id(), field.name(), new_type, field.optional(), field.doc());
return *this;
}
UpdateSchema& UpdateSchema::UpdateColumnDoc(std::string_view name,
std::string_view new_doc) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindFieldForUpdate(name));
ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot update missing column: {}", name);
const auto& field = field_opt->get();
int32_t field_id = field.field_id();
ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
"Cannot update a column that will be deleted: {}", field.name());
if (field.doc() == new_doc) {
return *this;
}
updates_[field_id] =
std::make_shared<SchemaField>(field.field_id(), field.name(), field.type(),
field.optional(), std::string(new_doc));
return *this;
}
UpdateSchema& UpdateSchema::RenameColumn(std::string_view name,
std::string_view new_name) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindField(name));
ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot rename missing column: {}", name);
ICEBERG_BUILDER_CHECK(!new_name.empty(), "Cannot rename a column to null");
const auto& field = field_opt->get();
int32_t field_id = field.field_id();
ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
"Cannot rename a column that will be deleted: {}", field.name());
auto update_it = updates_.find(field_id);
const SchemaField& base_field =
update_it != updates_.end() ? *update_it->second : field;
updates_[field_id] = std::make_shared<SchemaField>(
base_field.field_id(), std::string(new_name), base_field.type(),
base_field.optional(), base_field.doc());
auto it = std::ranges::find(identifier_field_names_, name);
if (it != identifier_field_names_.end()) {
*it = new_name;
}
return *this;
}
UpdateSchema& UpdateSchema::MakeColumnOptional(std::string_view name) {
return UpdateColumnRequirementInternal(name, /*is_optional=*/true);
}
UpdateSchema& UpdateSchema::RequireColumn(std::string_view name) {
return UpdateColumnRequirementInternal(name, /*is_optional=*/false);
}
UpdateSchema& UpdateSchema::UpdateColumnRequirementInternal(std::string_view name,
bool is_optional) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindFieldForUpdate(name));
ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot update missing column: {}", name);
const auto& field = field_opt->get();
if ((!is_optional && !field.optional()) || (is_optional && field.optional())) {
return *this;
}
// TODO(GuotaoYu): support added column with default value
// bool is_defaulted_add = IsAdded(name) && field.initial_default() != null;
bool is_defaulted_add = false;
ICEBERG_BUILDER_CHECK(is_optional || is_defaulted_add || allow_incompatible_changes_,
"Cannot change column nullability: {}: optional -> required",
name);
int32_t field_id = field.field_id();
ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
"Cannot update a column that will be deleted: {}", field.name());
updates_[field_id] = std::make_shared<SchemaField>(is_optional ? field.AsOptional()
: field.AsRequired());
return *this;
}
UpdateSchema& UpdateSchema::DeleteColumn(std::string_view name) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindField(name));
ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot delete missing column: {}", name);
const auto& field = field_opt->get();
int32_t field_id = field.field_id();
ICEBERG_BUILDER_CHECK(!parent_to_added_ids_.contains(field_id),
"Cannot delete a column that has additions: {}", name);
ICEBERG_BUILDER_CHECK(!updates_.contains(field_id),
"Cannot delete a column that has updates: {}", name);
deletes_.insert(field_id);
return *this;
}
UpdateSchema& UpdateSchema::MoveFirst(std::string_view name) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_id, FindFieldIdForMove(name));
return MoveInternal(name, Move::First(field_id));
}
UpdateSchema& UpdateSchema::MoveBefore(std::string_view name,
std::string_view before_name) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_id, FindFieldIdForMove(name));
ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR(
auto before_id, FindFieldIdForMove(before_name),
"Cannot move {} before missing column: {}", name, before_name);
ICEBERG_BUILDER_CHECK(field_id != before_id, "Cannot move {} before itself", name);
return MoveInternal(name, Move::Before(field_id, before_id));
}
UpdateSchema& UpdateSchema::MoveAfter(std::string_view name,
std::string_view after_name) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_id, FindFieldIdForMove(name));
ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR(
auto after_id, FindFieldIdForMove(after_name),
"Cannot move {} after missing column: {}", name, after_name);
ICEBERG_BUILDER_CHECK(field_id != after_id, "Cannot move {} after itself", name);
return MoveInternal(name, Move::After(field_id, after_id));
}
UpdateSchema& UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema) {
// TODO(Guotao Yu): Implement UnionByNameWith
AddError(NotImplemented("UpdateSchema::UnionByNameWith not implemented"));
return *this;
}
UpdateSchema& UpdateSchema::SetIdentifierFields(
const std::span<std::string_view>& names) {
identifier_field_names_ = names | std::ranges::to<std::vector<std::string>>();
return *this;
}
Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
for (const auto& name : identifier_field_names_) {
ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
if (field_opt.has_value()) {
const auto& field = field_opt->get();
auto field_id = field.field_id();
ICEBERG_CHECK(!deletes_.contains(field_id),
"Cannot delete identifier field {}. To force deletion, also call "
"SetIdentifierFields to update identifier fields.",
name);
auto parent_it = id_to_parent_.find(field_id);
while (parent_it != id_to_parent_.end()) {
int32_t parent_id = parent_it->second;
ICEBERG_CHECK(
!deletes_.contains(parent_id),
"Cannot delete field with id {} as it will delete nested identifier field {}",
parent_id, name);
parent_it = id_to_parent_.find(parent_id);
}
}
}
ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_, moves_);
ICEBERG_ASSIGN_OR_RAISE(auto new_type, visitor.ApplyChanges(schema_, kTableRootId));
auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
auto temp_schema = new_struct_type->ToSchema();
std::vector<int32_t> fresh_identifier_ids;
for (const auto& name : identifier_field_names_) {
ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
temp_schema->FindFieldByName(name, case_sensitive_));
ICEBERG_CHECK(field_opt.has_value(),
"Cannot add field {} as an identifier field: not found in current "
"schema or added columns",
name);
fresh_identifier_ids.push_back(field_opt->get().field_id());
}
auto new_fields = temp_schema->fields() | std::ranges::to<std::vector<SchemaField>>();
ICEBERG_ASSIGN_OR_RAISE(
auto new_schema,
Schema::Make(std::move(new_fields), schema_->schema_id(), fresh_identifier_ids));
std::unordered_map<std::string, std::string> updated_props;
const auto& base_metadata = base();
const auto& properties = base_metadata.properties.configs();
auto mapping_it = properties.find(std::string(TableProperties::kDefaultNameMapping));
if (mapping_it != properties.end() && !mapping_it->second.empty()) {
std::multimap<int32_t, int32_t> adds;
for (const auto& [parent_id, child_ids] : parent_to_added_ids_) {
std::ranges::for_each(child_ids, [&adds, parent_id](int32_t child_id) {
adds.emplace(parent_id, child_id);
});
}
ICEBERG_ASSIGN_OR_RAISE(
auto updated_mapping_json,
UpdateMappingFromJsonString(mapping_it->second, updates_, adds));
updated_props[std::string(TableProperties::kDefaultNameMapping)] =
std::move(updated_mapping_json);
}
return ApplyResult{.schema = std::move(new_schema),
.new_last_column_id = last_column_id_,
.updated_props = std::move(updated_props)};
}
// TODO(Guotao Yu): v3 default value is not yet supported
UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> parent,
std::string_view name, bool is_optional,
std::shared_ptr<Type> type,
std::string_view doc) {
int32_t parent_id = kTableRootId;
std::string full_name;
if (parent.has_value()) {
ICEBERG_BUILDER_CHECK(!parent->empty(), "Parent name cannot be empty");
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_field_opt, FindField(*parent));
ICEBERG_BUILDER_CHECK(parent_field_opt.has_value(), "Cannot find parent struct: {}",
*parent);
const SchemaField& parent_field = parent_field_opt->get();
const auto& parent_type = parent_field.type();
const SchemaField* target_field = &parent_field;
if (parent_type->type_id() == TypeId::kMap) {
const auto& map_type = internal::checked_cast<const MapType&>(*parent_type);
target_field = &map_type.value();
} else if (parent_type->type_id() == TypeId::kList) {
const auto& list_type = internal::checked_cast<const ListType&>(*parent_type);
target_field = &list_type.element();
}
ICEBERG_BUILDER_CHECK(target_field->type()->type_id() == TypeId::kStruct,
"Cannot add to non-struct column: {}: {}", *parent,
target_field->type()->ToString());
parent_id = target_field->field_id();
ICEBERG_BUILDER_CHECK(!deletes_.contains(parent_id),
"Cannot add to a column that will be deleted: {}", *parent);
auto current_name = std::format("{}.{}", *parent, name);
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field, FindField(current_name));
ICEBERG_BUILDER_CHECK(
!current_field.has_value() || deletes_.contains(current_field->get().field_id()),
"Cannot add column, name already exists: {}.{}", *parent, name);
// Build full name using canonical name of parent
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_name_opt,
schema_->FindColumnNameById(parent_id));
ICEBERG_BUILDER_CHECK(parent_name_opt.has_value(),
"Cannot find column name for parent id: {}", parent_id);
full_name = std::format("{}.{}", *parent_name_opt, name);
} else {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field, FindField(name));
ICEBERG_BUILDER_CHECK(
!current_field.has_value() || deletes_.contains(current_field->get().field_id()),
"Cannot add column, name already exists: {}", name);
full_name = std::string(name);
}
ICEBERG_BUILDER_CHECK(
is_optional || allow_incompatible_changes_,
"Incompatible change: cannot add required column without a default value: {}",
full_name);
int32_t new_id = AssignNewColumnId();
added_name_to_id_[CaseSensitivityAwareName(full_name)] = new_id;
if (parent_id != kTableRootId) {
id_to_parent_[new_id] = parent_id;
}
AssignFreshIdVisitor id_assigner([this]() { return AssignNewColumnId(); });
auto type_with_fresh_ids = id_assigner.Visit(type);
auto new_field = std::make_shared<SchemaField>(new_id, std::string(name),
std::move(type_with_fresh_ids),
is_optional, std::string(doc));
updates_[new_id] = std::move(new_field);
parent_to_added_ids_[parent_id].push_back(new_id);
return *this;
}
int32_t UpdateSchema::AssignNewColumnId() { return ++last_column_id_; }
Result<std::optional<std::reference_wrapper<const SchemaField>>> UpdateSchema::FindField(
std::string_view name) const {
return schema_->FindFieldByName(name, case_sensitive_);
}
Result<std::optional<std::reference_wrapper<const SchemaField>>>
UpdateSchema::FindFieldForUpdate(std::string_view name) const {
ICEBERG_ASSIGN_OR_RAISE(auto existing_field_opt, FindField(name));
if (existing_field_opt.has_value()) {
const auto& existing_field = existing_field_opt->get();
int32_t field_id = existing_field.field_id();
auto update_it = updates_.find(field_id);
if (update_it != updates_.end()) {
return std::optional<std::reference_wrapper<const SchemaField>>(
std::cref(*update_it->second));
}
return existing_field_opt;
}
auto added_it = added_name_to_id_.find(CaseSensitivityAwareName(name));
if (added_it != added_name_to_id_.end()) {
int32_t added_id = added_it->second;
if (auto update_it = updates_.find(added_id); update_it != updates_.end()) {
return std::optional<std::reference_wrapper<const SchemaField>>(
std::cref(*update_it->second));
}
}
return std::nullopt;
}
std::string UpdateSchema::CaseSensitivityAwareName(std::string_view name) const {
if (case_sensitive_) {
return std::string(name);
}
return StringUtils::ToLower(name);
}
Result<int32_t> UpdateSchema::FindFieldIdForMove(std::string_view name) const {
auto added_it = added_name_to_id_.find(CaseSensitivityAwareName(name));
if (added_it != added_name_to_id_.end()) {
return added_it->second;
}
ICEBERG_ASSIGN_OR_RAISE(auto field, FindField(name));
if (field.has_value()) {
return field->get().field_id();
}
return InvalidArgument("Cannot move missing column: {}", name);
}
UpdateSchema& UpdateSchema::MoveInternal(std::string_view name, const Move& move) {
auto parent_it = id_to_parent_.find(move.field_id);
if (parent_it != id_to_parent_.end()) {
int32_t parent_id = parent_it->second;
auto parent_field_result = schema_->FindFieldById(parent_id);
ICEBERG_BUILDER_CHECK(parent_field_result.has_value(),
"Cannot find parent field with id: {}", parent_id);
const auto& parent_field = parent_field_result.value()->get();
ICEBERG_BUILDER_CHECK(parent_field.type()->type_id() == TypeId::kStruct,
"Cannot move fields in non-struct type");
if (move.type == Move::MoveType::kBefore || move.type == Move::MoveType::kAfter) {
auto ref_parent_it = id_to_parent_.find(move.reference_field_id);
ICEBERG_BUILDER_CHECK(
ref_parent_it != id_to_parent_.end() && ref_parent_it->second == parent_id,
"Cannot move field {} to a different struct", name);
}
moves_[parent_id].push_back(move);
} else {
if (move.type == Move::MoveType::kBefore || move.type == Move::MoveType::kAfter) {
auto ref_parent_it = id_to_parent_.find(move.reference_field_id);
ICEBERG_BUILDER_CHECK(ref_parent_it == id_to_parent_.end(),
"Cannot move field {} to a different struct", name);
}
moves_[kTableRootId].push_back(move);
}
return *this;
}
} // namespace iceberg